Timeouts with Java 8 CompletableFuture: You’re probably doing it wrong

Spread the love

This was meant to be a quick blog post on using CompletableFuture in Java 8 for timeouts. But researching further on the same led to a lot of interesting discoveries! For an introduction to CompletableFuture in Java8, I would recommend reading this blog post.

A lot of places where similar approaches for timeouts have been mentioned do not mention the pitfalls, so I hope I can provide some light on them.

The objective: We have many situations where we want to run some functions in our application with timeouts. Eg: a dependency being called over HTTP might need to be timed out, a long computation which might affect the user and it is better to avoid it.

Let’s explore the various options we have for running something against a timeout.

Running with a timeout the synchronous way

This is a very simple API provided by CompletableFuture.get()

// The synchronous method
timeoutFuture = new CompletableFuture();
try {
    timeoutFuture.get(1000, TimeUnit.MILLIS);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

A TimeoutException is thrown from the running thread if the timeout period passes which in this case would always happen since timeoutFuture isn’t completed ever.

The code here is really simple, but with one major disadvantage of being synchronous. So you can’t start doing anything apart from busy waiting for the execution to complete.

Asynchronous way of doing timeouts

We will explore a couple of ways. And will also discuss why you might want to prefer one over the other.

Using a ScheduledExecutorService with CompletableFuture API

timeoutFuture = new CompletableFuture();
// Run a scheduled task which runs after 100 milliseconds
scheduler.schedule(timeoutFuture.completeExceptionally(new TimeoutException()), 100, TimeUnit.MILLIS));
finalFuture = CompletableFuture.anyOf(future, timeoutFuture);

Let’s talk a bit of how this works. The finalFuture represents the first future that gets completed, and returns the value of that scheduler is an instance of ScheduledExecutorService, and allows us to run futures at arbitrary times.

How many threads for ScheduledExecutorService?

The ScheduledExecutorService doesn’t do anything particularly time consuming. It just completes the future whenever the scheduler is supposed to run. Since we aren’t doing anything at all in the scheduler, we might as well use a single threaded executor.

WARNINGS

    • Note that the subsequent executions with .thenApply() may happen on the scheduler so it is best to not let anything run on the scheduler, by calling .supplyAsync() with some other executor explicitly on the result. Otherwise you should use a smaller thread pool for the ScheduledExecutorService.
    • The dependency future gets cancelled in case of a timeout, but the underlying task keeps running, and there is no easy way to cancel the underlying task execution! What problem could it cause for your application? Let me explain with give an example of a hypothetical application.

Our hypothetical application

Our application runs 10 dependency calls. Each dependency call has a timeout of 100ms and the calls are made concurrently. We have 10 threads for the executor which runs these calls.

What would be the throughput of our application?

Ideally we should be able to repeat this the complete cycle in 100ms(the timeout we have considered).

class MyApplication {
  public static Integer getValue() {
    System.out.println("I am called");
    // Simulating a long network call of 1 second in the worst case
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 10;
  }
  public static void main() {
    ExecutorService executor = new ThreadPoolExecutor(10, 10,
        0L, TimeUnit.MILLISECONDS,
        // This is an unbounded Queue. This should never be used
        // in real life. That is the first step to failure.
        new LinkedBlockingQueue<Runnable>());
    // We want to call the dummy service 10 times
    for (int i=0; i<10; ++i) {
CompletableFuture dependencyFuture = CompletableFuture.supplyAsync(() -> getValue(), executor);
      CompletableFuture futureTimeout = new CompletableFuture();
      schedulerService.schedule(() ->
        futureTimeout.completeExceptionally(throw new TimeoutException(), 100, TimeUnit.MILLISECONDS);
      CompletableFuture result = CompletableFuture.anyOf(dependencyFuture, futureTimeout);
      allFutures[i] = result;
    }
    // Finally wait for all futures to join
    CompletableFuture.allOf(allFutures).join();
    System.out.println("All futures completed");
    System.out.println(executor.toString());
  }

We get a very interesting output from our example(note that I have modified the Executors to add some additional information):

Running pool-1-thread-1
I am called
Running pool-1-thread-2
I am called
Running pool-1-thread-3
I am called
Running pool-1-thread-4
I am called
Running pool-1-thread-5
I am called
Running pool-1-thread-6
I am called
Running pool-1-thread-7
I am called
Running pool-1-thread-8
I am called
Running pool-1-thread-9
I am called
Running pool-1-thread-10
I am called
### Good till here, each thread spawns the particular dependency ###
All futures completed
### All futures have completed ###
MainLauncher$ModifiedExecutorService@34c45dca[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 0]
###  WHOOPS, our Executor says that we still are running 10 threads ###
Finished running
Finished running
Finished running
Finished running
Finished running
Finished running
Finished running
Finished running
Finished running
Finished running
###  Our threads complete now(roughly at their sleep endings) ###

Let’s try to understand what happened:

  • CompletableFuture doesn’t get tied to the execution happening, and unlike Future, there is no Cancel for CompletableFuture. Look at this blog post for more details.
  • Our expected Throughput was 10 per second. What do we get with this restriction? In this case, we can run only once in 1 second, so our throughput is just 1!!! It can actually hurt your application in a serious way.
  • If you model your dependencies so that they never fail on their own, they’ll consume an executor thread forever!

Possible Fix: Go back to Java Futures

The way out of here is to go back to using Java Futures instead of the CompletableFuture API. The reason is that Java Futures are tied to the execution tasks, and allow us to cancel a thread.
I am a believer in learning through examples, so let’s quickly pick up the same application we used above.

class MyApplication {
  public static Integer getValue() {
    System.out.println("I am called");
    // Simulating a long network call of 1 second in the worst case
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 10;
  }
  public static void main() {
    ExecutorService executor = new ThreadPoolExecutor(10, 10,
        0L, TimeUnit.MILLISECONDS,
        // This is an unbounded Queue. This should never be used
        // in real life. That is the first step to failure.
        new LinkedBlockingQueue<Runnable>());
    // We want to call the dummy service 10 times
    for (int i=0; i<10; ++i) {
allFutures[i] = CompletableFuture.supplyAsync(() -> {
          // Instead of using CompletableFuture.supplyAsync, directly create a future from the executor
          Future future = executorService.submit(() -> getValue());
          schedulerService.schedule(() -> future.cancel(true), 100, TimeUnit.MILLISECONDS);
          try {
            return future.get();
          } catch (InterruptedException | ExecutionException | CancellationException e) {
            // pass
          }
          // You can choose to return a dummy value here
          return null;
        });
    }
    // Finally wait for all futures to join
    CompletableFuture.allOf(allFutures).join();
    System.out.println("All futures completed");
    System.out.println(executor.toString());
  }

Here’s the output:

Running pool-1-thread-2
I am called
Running pool-1-thread-3
I am called
Running pool-1-thread-1
I am called
Finished running
Finished running
Finished running
Running pool-1-thread-4
I am called
Running pool-1-thread-5
I am called
Running pool-1-thread-6
I am called
Finished running
Finished running
Finished running
Running pool-1-thread-7
I am called
Running pool-1-thread-9
I am called
Running pool-1-thread-8
I am called
Finished running
Finished running
Finished running
Running pool-1-thread-10
I am called
Finished running
### Looks good ###
All futures completed
MainLauncher$ModifiedExecutorService@5b480cf9[Running, pool size = 10, active threads = 0, queued tasks = 0, completed tasks = 10]
### Aha! 0 queued and active threads after completion ###

What are we doing here?
Same as the previous case, except now we cancel the future that we provide to the executor. So now, the task doesn’t run on the executor anymore. Ideally, here we get the throughput of 10 that we expect.
BUT…
This approach too has caveats. Java Futures are blocking, so I had to wrap it inside CompletableFuture.supplyAsync() in order to make it asynchronous. This means that we need an executor to run this code too. In the above example, I haven’t specified any executor so it runs on ForkJoinPool, but you might need to use a custom executor for the same.

Unfortunately this means that we are using some threads to run the execution and some threads to just wait for the execution to finish. How to find the number of threads you would need? The same number of threads that we use for running the executions. So effectively, we are using double the number of threads that we actually need.
The moral of the story is, asynchronous programming is hard! A lot of your assumptions may be incorrect. Definitely try to understand how to use the CompletableFuture model before using it!

You may also like...

7 Responses

  1. Gagandeep Singh says:

    I like the moral of the story 🙂

  2. rayhon1014 says:

    good post. One point to add is future cancel may not be able to interrupt IO blocking thread like sleeping thread in your example. You may need to get down to socket timeout or close to free up the thread.

  3. Kunal,
    The moral may be different if you are using better tools 🙂
    In my library (https://github.com/vsilaev/tascalate-concurrent) both interruptible CompletionStage-s and timeout are addressed, so you can do:
    Executor myExecutor = Executors.newFixedThreadPool(4);

    CompletableTask
    .supplyAsync( () -> someIoBoundOrBlockingMethodCall(), myExecutor )
    .orTimeout (Duration.ofMillis(1000) );
    Btw, it doesn’t require double number of threads to run.

  4. Kisna says:

    There may be larger Caveat. Here in your thread, you are just sleeping, so cancel will call thread.interrupt() and stop your getValue() thread that is in wait() mode using InterruptedException and return.

    However, if there is a lot of computation, this may not work unless the runnable checks if thread is actually interrupted or not, say, in a loop. Even in the case of just waiting for some I/O from your Http request that uses socket connection, it “may work” depending on the underlying implementation. The ideal solution is to pass this timeout to your HttpClient as a socket timeout to exit early.

    And also, instead of creating two executorpools, one for the CompletableFuture.runAsync() and one for the schedulerService.schedule to wait() to finish, you can wrap futures in another wrapper CompletableFuture.allOf(…) with its own timeout . This wrapper future will timeout or return some or all futures that have completed.https://stackoverflow.com/questions/65220495/completablefuture-with-timeout

    Would really appreciate if can you also fix your last example as it has compilation issues and wrong names.

  5. Mr Person says:

    allFutures isn’t defined in the example.

Leave a Reply

Your email address will not be published. Required fields are marked *