structured concurrency in java - loom

Abstract

Purpose: supporting easy-to-use, high-throughput lightweight concurrency and new programming models to it

Platform threads:

  • abstraction over OS threads
  • limited scalability (when using thread-per-request model)
  • need for pooling Virtual threads:
  • lightweight user threads
  • highly scalable
  • no need for polling

How to create virtual threads:

Thread.ofVirtual().start(runnable);
// same
Thread.startVirtualThread(runnable);

Benefits of structured concurrency:

  • error handling with short-circuiting
  • cancellation propagation
  • clarity
  • observability

CompletableFuture vs structured concurrency

Get all

  +--> reserVenue --+
  |                 |
--+--> bookHotel ---+-->
  |                 |
  +--> buySupplies -+

CompletableFuture example:

public static void createEvent() {
  var future1 = CompletableFuture.supplyAsync(this::reserveVenue);
  var future2 = CompletableFuture.supplyAsync(this::bookHotel);
  var future3 = CompletableFuture.supplyAsync(this::buySupplies);
  var futureEvent = CompletableFuture.allOf(future1, future2, future3)
    .exceptionally(t -> {
      throw new RuntimeException(t);
    })
    .thenApply(ignored -> {
      var venue = future1.join();
      var hotel = future2.join();
      var supplies = future3.join();
      return new Event(venue, hotel, supplies);
    });
  System.out.println("Event " + futureEvent.join());
}

Structured concurrency example:

public static void createEvent() {
  try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(this::reserveVenue);
    var task1 = scope.fork(this::bookHotel);
    var task1 = scope.fork(this::buySupplies);
    scope.join().throwIfFailed();
    var venue = task1.get();
    var hotel = task2.get();
    var supplies = task3.get();
    System.out.println("Event " + new Event(venue, hotel, supplies));
  } catch (InterruptedException | ExceutionException e) {
    throw new RuntimeException(e);
  }
}

Get first

  +--> getWeatherFromSource1 ---+
  |                             |
--+--> getWeatherFromSource2 ---+-->
  |                             |
  +--> getWeatherFromSource3 ---+

CompletableFuture example:

// this example cannot handle when one task throws an exception
public static void getWeather() {
  var future1 = CompletableFuture.supplyAsync(() -> getWeatherFromSource1("Paris"));
  var future2 = CompletableFuture.supplyAsync(() -> getWeatherFromSource2("Paris"));
  var future3 = CompletableFuture.supplyAsync(() -> getWeatherFromSource3("Paris"));
  CompletableFuture.anyOf(future1, future2, future3)
    .exceptionally(t -> {
      throw new RuntimeException(e);
    })
    .thenAccept(weather -> System.out.println(weather))
    .join();
}

Structured concurrency example

// this example will not fail if one of the task throws an exception
public static void getWeather() {
  try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()) {
    scope.fork(() -> getWeatherFromSource1("Paris"));
    scope.fork(() -> getWeatherFromSource2("Paris"));
    scope.fork(() -> getWeatherFromSource3("Paris"));
    var weather = scope.join().result();
    System.out.println(weather);
  } catch (InterruptedException | ExceutionException e) {
    throw new RuntimeException(e);
  }
}

Get then

CompletableFuture example:

-> getCurrentCustomer +-> getSavingsData +--> calculateOffer
                      |                  |
                      +-> getLoansData --+
public static void getOfferForCustoer() {
  var future1 = CompletableFuture.supplyAsync(this::getCurrentCustomer);
  var future2 = future1.thenApplyAsync(this::getSavingsData);
  var future3 = future1.thenApplyAsync(this::getLoansData);
  var customer = future1
    .exceptionally(t -> {
      throw new RuntimeException(e);
    })
    .join();
  var future = future2
    .thenCombine(future3, ((saving, loadns) -> new CustomerDetails(customer, savings, loans)))
    .thenSupplyAsync(this::calculateOffer)
    .exceptionally(t -> {
      throw new RuntimeException(e);
    });
  System.out.println("Offer " + future.join());
}

Structured concurrency example:

public static void getOfferForCustomer() {
  var customer = getCurrentCustomer();
  try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> getSavingsData(customer));
    var task2 scope.fork(() -> getLoansData(customer));
    scope.join().throwIfFailed();
 
    var savings = task1.get();
    var loans = task2.get();
    var details = new CustomerDetails(customer, savings, loans);
 
    var task3 = scope.fork(() -> calculateOffer(details));
    scope.join().throwIfFailed();
    var offer = task3.get();
    // or just offer = calculateOffer(details);
    // no need to use concurrency for the last step
 
    System.out.println(offer);
  } catch (InterruptedException | ExceutionException e) {
    throw new RuntimeException(e);
  }
}

Shutdown policies

`ShutdownOnFailure

  • stops when one of the tasks “fail”
  • cancels other tasks
  • useful when you want that all tasks must complete successfully

ShutdownOnSuccess

  • stops when one of the tasks “succeed”
  • cancels other tasks
  • useful when you want any one of the tasks to complete successfully

Custom

  • extends StructuredTaskScope
  • override handleComplete
  • useful when you want custom login / some tasks to complete successfully