structured concurrency in java - loom
Abstract
Purpose: supporting easy-to-use, high-throughput lightweight concurrency and new programming models to it
Platform threads:
- abstraction over OS threads
 - limited scalability (when using thread-per-request model)
 - need for pooling Virtual threads:
 - lightweight user threads
 - highly scalable
 - no need for polling
 
How to create virtual threads:
Thread.ofVirtual().start(runnable);
// same
Thread.startVirtualThread(runnable);Benefits of structured concurrency:
- error handling with short-circuiting
 - cancellation propagation
 - clarity
 - observability
 
CompletableFuture vs structured concurrency
Get all
  +--> reserVenue --+
  |                 |
--+--> bookHotel ---+-->
  |                 |
  +--> buySupplies -+
CompletableFuture example:
public static void createEvent() {
  var future1 = CompletableFuture.supplyAsync(this::reserveVenue);
  var future2 = CompletableFuture.supplyAsync(this::bookHotel);
  var future3 = CompletableFuture.supplyAsync(this::buySupplies);
  var futureEvent = CompletableFuture.allOf(future1, future2, future3)
    .exceptionally(t -> {
      throw new RuntimeException(t);
    })
    .thenApply(ignored -> {
      var venue = future1.join();
      var hotel = future2.join();
      var supplies = future3.join();
      return new Event(venue, hotel, supplies);
    });
  System.out.println("Event " + futureEvent.join());
}Structured concurrency example:
public static void createEvent() {
  try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(this::reserveVenue);
    var task1 = scope.fork(this::bookHotel);
    var task1 = scope.fork(this::buySupplies);
    scope.join().throwIfFailed();
    var venue = task1.get();
    var hotel = task2.get();
    var supplies = task3.get();
    System.out.println("Event " + new Event(venue, hotel, supplies));
  } catch (InterruptedException | ExceutionException e) {
    throw new RuntimeException(e);
  }
}Get first
  +--> getWeatherFromSource1 ---+
  |                             |
--+--> getWeatherFromSource2 ---+-->
  |                             |
  +--> getWeatherFromSource3 ---+
CompletableFuture example:
// this example cannot handle when one task throws an exception
public static void getWeather() {
  var future1 = CompletableFuture.supplyAsync(() -> getWeatherFromSource1("Paris"));
  var future2 = CompletableFuture.supplyAsync(() -> getWeatherFromSource2("Paris"));
  var future3 = CompletableFuture.supplyAsync(() -> getWeatherFromSource3("Paris"));
  CompletableFuture.anyOf(future1, future2, future3)
    .exceptionally(t -> {
      throw new RuntimeException(e);
    })
    .thenAccept(weather -> System.out.println(weather))
    .join();
}Structured concurrency example
// this example will not fail if one of the task throws an exception
public static void getWeather() {
  try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()) {
    scope.fork(() -> getWeatherFromSource1("Paris"));
    scope.fork(() -> getWeatherFromSource2("Paris"));
    scope.fork(() -> getWeatherFromSource3("Paris"));
    var weather = scope.join().result();
    System.out.println(weather);
  } catch (InterruptedException | ExceutionException e) {
    throw new RuntimeException(e);
  }
}Get then
CompletableFuture example:
-> getCurrentCustomer +-> getSavingsData +--> calculateOffer
                      |                  |
                      +-> getLoansData --+
public static void getOfferForCustoer() {
  var future1 = CompletableFuture.supplyAsync(this::getCurrentCustomer);
  var future2 = future1.thenApplyAsync(this::getSavingsData);
  var future3 = future1.thenApplyAsync(this::getLoansData);
  var customer = future1
    .exceptionally(t -> {
      throw new RuntimeException(e);
    })
    .join();
  var future = future2
    .thenCombine(future3, ((saving, loadns) -> new CustomerDetails(customer, savings, loans)))
    .thenSupplyAsync(this::calculateOffer)
    .exceptionally(t -> {
      throw new RuntimeException(e);
    });
  System.out.println("Offer " + future.join());
}Structured concurrency example:
public static void getOfferForCustomer() {
  var customer = getCurrentCustomer();
  try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> getSavingsData(customer));
    var task2 scope.fork(() -> getLoansData(customer));
    scope.join().throwIfFailed();
 
    var savings = task1.get();
    var loans = task2.get();
    var details = new CustomerDetails(customer, savings, loans);
 
    var task3 = scope.fork(() -> calculateOffer(details));
    scope.join().throwIfFailed();
    var offer = task3.get();
    // or just offer = calculateOffer(details);
    // no need to use concurrency for the last step
 
    System.out.println(offer);
  } catch (InterruptedException | ExceutionException e) {
    throw new RuntimeException(e);
  }
}Shutdown policies
`ShutdownOnFailure
- stops when one of the tasks “fail”
 - cancels other tasks
 - useful when you want that all tasks must complete successfully
 
ShutdownOnSuccess
- stops when one of the tasks “succeed”
 - cancels other tasks
 - useful when you want any one of the tasks to complete successfully
 
Custom
- extends 
StructuredTaskScope - override 
handleComplete - useful when you want custom login / some tasks to complete successfully