Parallel Processing
Working Asynchronously
Last week, we used threads to do stuff in parallel. In modern application design, these threads are used to take workload off the main thread so that the actual application (e.g. a GUI or microservice) remains responsive to external input.
Using threads in Java is fairly simple:
- Extend
Thread
and overwriterun()
, or create a new thread with reference to aRunnable
- Call
start()
on the thread instance to signal that the thread is ready to run. - Optionally use
join()
on the thread instance to wait for its completion.
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Hello from my custom thread!");
}
});
t.start();
System.out.println("Hello from the application main thread!")
System.out.println("Waiting for thread to complete...");
t.join();
System.out.println("All done.");
From the main application’s point of view, the code in run()
is run asynchronously: The execution of the main
method continues right after the t.start()
call, independent of the execution of run()
.
On a multi-core computer, the threads may actually be active at the same time, one thread per core.
There are two major drawbacks of this method:
- There is no (direct) way to provide arguments to or obtaining results from the computation done in the thread.
This is also reflected in the fact that
run()
does not take any arguments and has return typevoid
. - There is no (direct) way to communicate potential exceptions that occur in the thread to the main application; the signature of
run()
can not be modified.
How could these common issues be solved?
A possible solution is to store both arguments and results inside the thread or Runnable
, and relay possible exceptions to whoever tries to retrieve the result.
public class JobWithResult implements Runnable {
private int a, b, result;
private Exception e;
JobWithResult(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
try {
this.result = a / b;
} catch (Exception e) {
System.out.println("Oops, an exception raised; storing for later.");
this.e = e;
}
}
int getResult() throws Exception {
if (e != null)
throw e;
return result;
}
}
Which can be used in your main program:
JobWithResult jwr = new JobWithResult(4, 0);
Thread t = new Thread(jwr);
t.start();
System.out.println("Waiting for thread to complete...");
t.join();
System.out.println("All done.");
try {
System.out.println(jwr.getResult());
} catch (Exception e) {
System.out.println("Ooops: " + e.toString());
}
The Future of Callables
The code above features three key parts.
- A
Runnable
which may use external information and stores results as attributes. Possible exceptions raised in therun()
method are caught and stored as attribute. - The main program creates and starts the thread to execute the
run()
method of the above instance. - The main program uses the thread’s
join()
to wait for completion, and our modifiedRunnable
’sgetResult()
to obtain the result (or raise a possible exception).
Let’s refactor the code to separate out the recurring scheme (the mechanics) from the actual business logic (the contents of run
).
For (1), we’ll use a different interface to reflect a return type.
interface Callable<V> {
V call();
}
For (3), we’ll introduce an interface that will allow us to retrieve the future result from the Callable
, once it’s done.
interface Future<T> {
T get();
}
The remaining part (2) that organizes the thread logistics to run the actual task, we’ll stick here:
interface Executor {
<T> Future<T> async(Callable<T> task);
}
As you can see, the Executor
ties things together.
Let’s see how we can make the threading work.
The incoming Callable
needs to be wrapped into a thread, while watching for possible exceptions.
Furthermore, it must return a Future
that, on get()
, waits for the thread to finish and then either throws any exception that occurred or returns the result.
class SimpleExecutor implements Executor {
@Override
public <T> Future<T> async(Callable<T> task) {
// create anonymous Future instance
return new Future<T> () {
Thread t; // handle on the thread (get needs to wait!)
T result; // save the result
ExecutionException e; // in case something goes wrong?
// constructor block
{
// create a new thread and start it
// the runnable "watches" over the task
t = new Thread(new Runnable() {
@Override
public void run() {
try {
result = task.call();
} catch (Exception ex) {
e = new ExecutionException(ex);
}
}
});
t.start();
}
@Override
public T get() throws InterruptedException, ExecutionException {
// wait for it...
t.join();
// anything fishy?
if (e != null)
throw e;
// all good, return result!
return result;
}
};
}
}
Putting the pieces together, the SimpleExecutor
simplifies asynchronous execution tremendously:
Executor ex = new SimpleExecutor();
int a = 4, b = 0;
Future<Integer> f1 = ex.async(new Callable<Integer>() {
@Override
public Integer call() {
// can use variables from outer scope
return a / b;
}
});
// do other things if you like...
try {
System.out.println(f1.get());
} catch (ExecutionException e) {
System.out.println("The thread raised an exception: "
+ e.getCause());
}
Callables, Futures and Executors since Java 5
Since Java 5, these three parts are realized as Callable (1), Executor (2) and Future (3). The “real” interfaces are slightly different to allow a more faceted use.
interface Callable<V> {
V call();
}
interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
V get();
V get(long timeout, TimeUnit unit);
boolean isCancelled();
boolean isDone();
}
interface ExecutorService extends Executor {
void execute(Runnable command); // for convenience
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
// ...
}
Java 5 also introduced a number of different executors, here are a few examples:
Executor executor = Executors.newSingleThreadExecutor();
// executor = Executors.newCachedThreadPool(); // reuses threads
// executor = Executors.newFixedThreadPool(5); // use 5 threads
executor.execute(new Runnable() {
public void run() {
System.out.println("Hello world!");
}
});
Asynchronous on Steroids
The previously introduced Executor
allows to asynchronously execute a Callable
, only waiting (blocking) on the executing thread when calling get()
on the Future
.
This is convenient enough for basic use cases (e.g. re-query the database after a swipe-down gesture on mobile), but quickly reaches its limits for more complex operations, where the result of one operation gets processed by another.
For the remainder of this chapter, let’s consider this rather frequent example:
For a cloud-backed application, you write a displayStatus()
method to
- sign in with username and password; on success you gain a token to
- retrieve the user’s status; once received,
- greet the user.
public class Workflow {
static void displayStatus() throws ExecutionException, InterruptedException {
final String user = "riko493";
final String pass = "12345"; // spaceballs, anyone? :-)
// log in...
Future<String> f1 = Executor.async(new Callable<String>() {
public String call() {
System.out.println("Authenticating with " + user + ":" + pass);
return "secrettoken";
}
});
final String token = f1.get();
// retrieve user
Future<String> f2 = Executor.async(new Callable<String>() {
public String call() {
System.out.println("Retrieving user details with token " + token);
return "lightly sleep deprived, should get haircut";
}
});
final String details = f2.get();
System.out.println("Welcome " + user + "! You look " + details);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
displayStatus(); // blocks until completed!!
}
}
While this works, the displayStatus()
method blocks until the status is displayed, thus blocking the remainder of the application from being responsive.
Also, reacting to execution exceptions becomes tricky: which parts should be guarded by try-catch
?
It would be desirable to be able to chain asynchronous actions that depend on each other. The CompletableFuture class added in Java 8 provides exactly that:
class CompletableFuture<T> implements CompletionStage<T>, Future<T> {
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
// ...
}
<U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
// ...
}
<U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action) {
// ...
}
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
// ...
}
// and much more...
}
The supplyAsync
accepts a Supplier<U>
, which is almost identical to Callable<T>
, but rooted in java.util.function
for semantic reasons.
This method is used to create a CompletableFuture
that can then be chained with other actions.
Note the generic typing of thenApply
and thenAccept
: Although the CompletableFuture
is generic in T
, both methods return CompletionStage<U>
, i.e. a (possibly) different generic type.
The thenApply
method takes a @FunctionalInterface
Function to map values of ? super T
to ? extends U
.
The thenAccept
method takes a @FunctionalInterface
Consumer to act on ? super T
.
The exceptionally
method accepts a function that maps a Throwable
to ? extends T
to handle exceptions gracefully by providing alternate input to the next in line.
Since all those are functional interfaces, we can use method references and lambda notation for a cleaner codebase.
CompletableFuture<?> cf = CompletableFuture.supplyAsync(() -> "riko493:12345")
.thenApplyAsync(creds -> {
System.out.println("Authenticating with " + creds);
return "secrettoken";
})
.thenApplyAsync(token -> {
System.out.println("Retrieving status with token=" + token);
return "in the mood for holidays";
})
.thenAccept(status -> System.out.println(status))
.exceptionally(ex -> { System.out.println("Oops, something went wrong: " + ex); return null; });
System.out.println("All done!");
You can even combine (synchronize) multiple CompletableFuture
by using its
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
ChatGPT 4 on CompletableFuture
Retrieved on 2024-01-08, query was “give me a detailed walk-through for completablefuture in java”
CompletableFuture
in Java is an advanced tool for asynchronous programming, introduced in Java 8. It allows you to write non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its progress, completion, or failure. Here’s a detailed walkthrough:
1. Basic Creation and Usage
- Creating a
CompletableFuture
:CompletableFuture<String> future = new CompletableFuture<>();
- Completing a
CompletableFuture
: You can manually complete it:future.complete("Result");
- Running Asynchronously:
Run a task asynchronously and return a new
CompletableFuture
:CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // some long-running task });
For a task that returns a result:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "Result"; });
2. Combining CompletableFuture
s
- thenApply:
Transform the result of a
CompletableFuture
:CompletableFuture<String> transformed = future.thenApply(result -> result.toUpperCase());
- thenCompose:
Chain another
CompletableFuture
:CompletableFuture<String> chainedFuture = future.thenCompose(result -> anotherAsyncMethod(result));
- thenCombine:
Combine two independent
CompletableFuture
s:CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
3. Handling Errors
- exceptionally:
Handle exceptions from the
CompletableFuture
:CompletableFuture<String> handled = future.exceptionally(ex -> "Default Value");
4. Waiting for Completion
- join:
Waits for the
CompletableFuture
to complete and retrieves its result (blocking):String result = future.join();
- get:
Similar to
join
but throws checked exceptions:try { String result = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
5. Asynchronous Methods
- thenApplyAsync, thenComposeAsync, etc.:
Variants of
thenApply
,thenCompose
, etc., that run their tasks asynchronously.
6. Completing on Timeout
- orTimeout / completeOnTimeout (Java 9+):
future.orTimeout(1, TimeUnit.SECONDS); future.completeOnTimeout("Default Value", 1, TimeUnit.SECONDS);
7. Combining Multiple Futures
- allOf:
Wait for all
CompletableFuture
s to complete:CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
- anyOf:
Wait for any of the
CompletableFuture
s to complete:CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2, future3);
Conclusion
CompletableFuture
is a powerful tool for asynchronous programming, allowing for complex non-blocking workflows and efficient handling of concurrent tasks. It’s important to handle exceptions properly and understand the differences between blocking (join/get) and non-blocking methods. With Java 9 and later versions, additional features like timeout handling make CompletableFuture
even more versatile.
Other Concurrency Utilities
Since Java 5, there are a number of other concurrency utilities available.
java.util.concurrent
provides thread safe collectionsBlockingQueue
: FIFO queue that blocks on write-if-full and read-if-emptyConcurrentMap
: map with atomic read/write operations
ThreadLocalRandom
provides random numbers with per-thread randomizationAtomicInteger
allows atomic integer increment/decrementCountDownLatch
allows for atomic counting and waiting
A frequent use for the CountDownLatch
is batch processing across multiple threads:
List<String> filesToProcess = ...;
CountDownLatch latch = new CountDownLatch(filesToProcess.size());
ExecutorService ex = Executors.newFixedThreadPool(5);
for (String f : filesToProcess) {
ex.submit(() -> {
System.out.println("processing file " + f);
// countdown when done!
latch.countDown();
});
}
latch.await();
// shutdown the threads to allow main() to quit.
ex.shutdown();
∎