A Future represents the pending result of an asynchronous computation. It offers a method — get — that returns the result of the computation when it’s done.

The problem is that a call to get is blocking until the computation is done. This is quite restrictive and can quickly make the asynchronous computation pointless.

CompletableFuture<T> extends Future<T> and makes it completable in an ad hoc manner. This is a big deal, considering that Future objects were limited before Java 8.
This new and improved CompletableFuture has mainly two benefits:

It can be explicitly completed by calling the complete() method without any synchronous wait. It allows values of any type to be available in the future with default return values, even if the computation did not complete, using default/intermediate results.

It also allows you to build a pipeline data process in a series of actions. You can find a number of patterns for CompletableFutures such as creating a CompletableFuture from a task, or building a CompletableFuture chain.

CompletableFuture implements Future and CompletionStage interfaces.

A CompletionStage is a promise that ensures the computation will eventually be done and the great thing about the CompletionStage it offers a vast selection of methods that let you attach callbacks that will be executed on completion. This way we can build systems in a non-blocking fashion.

CompletableFuture<Double> cf = CompletableFuture.supplyAsync(this::createRandomNumber);

supplyAsync takes a Supplier containing the code we want to execute asynchronously — in our case the createRandomNumber or createId method. If you had worked with Future then you may surprise where the Executor gone. If you want, you can still pass Executor as a second argument to the supplyAsync method. However, if you do not pass any Executor as a second argument then it will be submitted to the default ForkJoinPool.commonPool().

CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::createId).thenApply(this::convert)
                .thenAccept(this::store);

thenAccept is one of many ways to add a callback. It takes a Consumer — in our case a method called store — which handles the result of the preceding computation when it is done but it does not return any value.

If you want to continue passing values from one callback to another and also if you want to return value and pass that to next callback then you can use thenApply callback. In our example, thenApply takes an argument a method called convert.

In the example above, everything will be executed on the same thread. This results in the last message waiting for the first message to complete.

Now look at the below example each message is submitted as a separate task to the ForkJoinPool.commonPool(). This results in both the sendURL callbacks being executed when the preceding calculation(findURL) is done.

CompletableFuture<String> cf = CompletableFuture.supplyAsync(this::findURL);
//execute the below task on separate thread on completion of previous task cf
CompletableFuture<String> resp1 = cf.thenApplyAsync(this::sendURL);
//execute the below task on separate thread on completion of previous task cf
CompletableFuture<String> resp2 = cf.thenApplyAsync(this::sendURL);
cf.thenAccept(System.out::println);
resp1.thenAccept(System.out::println);
resp2.thenAccept(System.out::println);

The key is — the asynchronous version can be convenient when you have several callbacks dependent on the same computation.

As we know sometimes we may face things go wrong as we have already faced working with Future but CompletableFuture has a feature to handle such situation in a nice way, using execptionally.

exceptionally gives us a chance to recover by taking an alternative function that will be executed if preceding calculation fails with an exception. This way succeeding callbacks can continue with the alternative result as input.

CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::failureMsg).exceptionally(ex -> notify(ex))
                .thenAccept(this::notify);
cf.get();

Sometimes you need to create a callback that is dependent on the result of two computations. This is where thenCombine comes into the picture.

CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);

First, we have started two asynchronous jobs — creating random number and finding a URL. Then we use thenCombine to do with the result of these two computations by defining our method notify.

We have covered the scenario where you were dependent on two computations for further process. But, what about when you just need the result of one of them?

CompletableFuture<String> cfName = CompletableFuture.supplyAsync(this::findName);
CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
cfName.acceptEither(cfUrl, this::sendMsg);

The complete example is given below

package com.roytuts.concurrent.completablefuture;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureExample {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFutureExample example = new CompletableFutureExample();

        // chain example
        example.exampleOp();

        // explicitly complete
        example.completeOp();

        // return known value
        example.knownValueOp();

        // separate task using the async suffix
        example.separateTasksOp();

        // exceptionally
        // example.exceptionallyOp();

        // thenCombine
        example.combineOp();

        // acceptEither
        example.acceptEitherOp();
    }

    public void exampleOp() throws InterruptedException, ExecutionException {
        // block and wait for the result
        // CompletableFuture allows you to build pipeline executed
        // asynchronously within the ForkJoinPool
        CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::createId).thenApply(this::convert)
                .thenAccept(this::store);
        System.out.println("null : " + cf.get());
    }

    public void completeOp() throws InterruptedException, ExecutionException {
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(this::createRandomNumber);
        // if you want to control concurrency using ExecutorService, i.e., if
        // you do not want to submit the task to default
        // ForkJoinPool.commonPool(), then pass es as second argument to the
        // supplyAsync method
        /*
         * ExecutorService es = Executors.newFixedThreadPool(2);
         * CompletableFuture<Double> cf =
         * CompletableFuture.supplyAsync(this::createRandomNumber, es);
         */
        // explicitly complete and return default value if you do not want to
        // wait for the task to complete
        cf.complete(434345.8765);
        System.out.println("Random Number : " + cf.get());
    }

    public void knownValueOp() throws InterruptedException, ExecutionException {
        // create a completed CompletableFuture in advance that returns a known
        // value
        // this might come in handy in testing environment, in case you will
        // want to combine that known value with one that needs to be computed
        CompletableFuture<String> cf = CompletableFuture.completedFuture("I'm done");
        cf.isDone(); // return true
        cf.join(); // return "I'm done"
        System.out.println("Known value : " + cf.get());
    }

    public void separateTasksOp() throws InterruptedException, ExecutionException {
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(this::findURL);
        // execute the below task on separate thread on completion of previous
        // task cf
        CompletableFuture<String> resp1 = cf.thenApplyAsync(this::sendURL);
        // execute the below task on separate thread on completion of previous
        // task cf
        CompletableFuture<String> resp2 = cf.thenApplyAsync(this::sendURL);
        cf.thenAccept(System.out::println);
        // cf.thenAcceptAsync(System.out::println);
        resp1.thenAccept(System.out::println);
        // resp1.thenAcceptAsync(System.out::println);
        resp2.thenAccept(System.out::println);
        // resp2.thenAcceptAsync(System.out::println);
    }

    public void exceptionallyOp() throws InterruptedException, ExecutionException {
        CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::failureMsg).exceptionally(ex -> notify(ex))
                .thenAccept(this::notify);
        cf.get();
    }

    public void combineOp() throws InterruptedException, ExecutionException {
        // what we want to do with the result of these two computations
        CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
        CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
        CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);
        System.out.println("Combine : " + resp.get());
    }

    public void acceptEitherOp() throws InterruptedException, ExecutionException {
        // what we want to do with the result of any of the two computations
        CompletableFuture<String> cfName = CompletableFuture.supplyAsync(this::findName);
        CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
        cfName.acceptEither(cfUrl, this::sendMsg);
    }

    private Double createRandomNumber() {
        return Math.random();
    }

    private UUID createId() {
        return UUID.randomUUID();
    }

    private String convert(UUID input) {
        return input.toString();
    }

    private String findURL() {
        return "roytuts.com";
    }

    private String findName() {
        return "Roy Tutorials";
    }

    private String sendURL(String url) {
        return "Sending " + url + " to destination";
    }

    private String failureMsg() {
        throw new RuntimeException("Failured due to Exception");
    }

    private String notify(Throwable t) {
        throw new RuntimeException(t.getMessage());
    }

    private void notify(String msg) {
        System.out.println("The message : " + msg);
    }

    private void sendMsg(String msg) {
        System.out.println("The message : " + msg);
    }

    private String notify(Double num, String msg) {
        return msg + "," + num;
    }

    private void store(String message) {
        System.out.println("message : " + message);
    }

}

The output

message : 2a0f39c0-6e38-453d-bb37-3256c160afbd
null : null
Random Number : 434345.8765
Known value : I'm done
roytuts.com
Sending roytuts.com to destination
Sending roytuts.com to destination
Combine : roytuts.com,0.16379160582907637
The message : Roy Tutorials

Thanks for reading.

I am a professional Web developer, Enterprise Application developer, Software Engineer and Blogger. Connect me on Roy Tutorials Twitter Facebook  Google Plus Linkedin Or Email Me

Leave a Reply

Your email address will not be published. Required fields are marked *