structured concurrency in java - loom
Abstract
Purpose: supporting easy-to-use, high-throughput lightweight concurrency and new programming models to it
Platform threads:
- abstraction over OS threads
- limited scalability (when using thread-per-request model)
- need for pooling Virtual threads:
- lightweight user threads
- highly scalable
- no need for polling
How to create virtual threads:
Thread.ofVirtual().start(runnable);
// same
Thread.startVirtualThread(runnable);
Benefits of structured concurrency:
- error handling with short-circuiting
- cancellation propagation
- clarity
- observability
CompletableFuture
vs structured concurrency
Get all
+--> reserVenue --+
| |
--+--> bookHotel ---+-->
| |
+--> buySupplies -+
CompletableFuture
example:
public static void createEvent() {
var future1 = CompletableFuture.supplyAsync(this::reserveVenue);
var future2 = CompletableFuture.supplyAsync(this::bookHotel);
var future3 = CompletableFuture.supplyAsync(this::buySupplies);
var futureEvent = CompletableFuture.allOf(future1, future2, future3)
.exceptionally(t -> {
throw new RuntimeException(t);
})
.thenApply(ignored -> {
var venue = future1.join();
var hotel = future2.join();
var supplies = future3.join();
return new Event(venue, hotel, supplies);
});
System.out.println("Event " + futureEvent.join());
}
Structured concurrency example:
public static void createEvent() {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(this::reserveVenue);
var task1 = scope.fork(this::bookHotel);
var task1 = scope.fork(this::buySupplies);
scope.join().throwIfFailed();
var venue = task1.get();
var hotel = task2.get();
var supplies = task3.get();
System.out.println("Event " + new Event(venue, hotel, supplies));
} catch (InterruptedException | ExceutionException e) {
throw new RuntimeException(e);
}
}
Get first
+--> getWeatherFromSource1 ---+
| |
--+--> getWeatherFromSource2 ---+-->
| |
+--> getWeatherFromSource3 ---+
CompletableFuture
example:
// this example cannot handle when one task throws an exception
public static void getWeather() {
var future1 = CompletableFuture.supplyAsync(() -> getWeatherFromSource1("Paris"));
var future2 = CompletableFuture.supplyAsync(() -> getWeatherFromSource2("Paris"));
var future3 = CompletableFuture.supplyAsync(() -> getWeatherFromSource3("Paris"));
CompletableFuture.anyOf(future1, future2, future3)
.exceptionally(t -> {
throw new RuntimeException(e);
})
.thenAccept(weather -> System.out.println(weather))
.join();
}
Structured concurrency example
// this example will not fail if one of the task throws an exception
public static void getWeather() {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()) {
scope.fork(() -> getWeatherFromSource1("Paris"));
scope.fork(() -> getWeatherFromSource2("Paris"));
scope.fork(() -> getWeatherFromSource3("Paris"));
var weather = scope.join().result();
System.out.println(weather);
} catch (InterruptedException | ExceutionException e) {
throw new RuntimeException(e);
}
}
Get then
CompletableFuture
example:
-> getCurrentCustomer +-> getSavingsData +--> calculateOffer
| |
+-> getLoansData --+
public static void getOfferForCustoer() {
var future1 = CompletableFuture.supplyAsync(this::getCurrentCustomer);
var future2 = future1.thenApplyAsync(this::getSavingsData);
var future3 = future1.thenApplyAsync(this::getLoansData);
var customer = future1
.exceptionally(t -> {
throw new RuntimeException(e);
})
.join();
var future = future2
.thenCombine(future3, ((saving, loadns) -> new CustomerDetails(customer, savings, loans)))
.thenSupplyAsync(this::calculateOffer)
.exceptionally(t -> {
throw new RuntimeException(e);
});
System.out.println("Offer " + future.join());
}
Structured concurrency example:
public static void getOfferForCustomer() {
var customer = getCurrentCustomer();
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> getSavingsData(customer));
var task2 scope.fork(() -> getLoansData(customer));
scope.join().throwIfFailed();
var savings = task1.get();
var loans = task2.get();
var details = new CustomerDetails(customer, savings, loans);
var task3 = scope.fork(() -> calculateOffer(details));
scope.join().throwIfFailed();
var offer = task3.get();
// or just offer = calculateOffer(details);
// no need to use concurrency for the last step
System.out.println(offer);
} catch (InterruptedException | ExceutionException e) {
throw new RuntimeException(e);
}
}
Shutdown policies
`ShutdownOnFailure
- stops when one of the tasks “fail”
- cancels other tasks
- useful when you want that all tasks must complete successfully
ShutdownOnSuccess
- stops when one of the tasks “succeed”
- cancels other tasks
- useful when you want any one of the tasks to complete successfully
Custom
- extends
StructuredTaskScope
- override
handleComplete
- useful when you want custom login / some tasks to complete successfully