Adam Warski (@adamwarski), SoftwareMill
Scale By the Bay, November 12th, 2020
We want to concurrently run multiple processes.
We'd like to create a large number of processes in a cheap way.
Currently, the basic unit of concurrency is a Thread
.
Thread
s map 1-1 to kernel threads:
CompletableFuture<T>
A value, representing a computation running in the background.
Eventually, the computation will yield a value of type T
or end with an error.
var f1 = CompletableFuture.supplyAsync(() -> "x");
var f2 = CompletableFuture.supplyAsync(() -> 42);
var f3 = f1.thenCompose(v1 ->
f2.thenApply(v2 -> "Result: " + v1 + " " + v2));
var f = CompletableFuture
.supplyAsync(() -> 42)
.thenCompose(v1 ->
CompletableFuture.supplyAsync(() -> "Got: " + v1));
boolean activateUser(Long userId) {
User user = database.findUser(userId);
if (user != null && !user.isActive()) {
database.activateUser(userId);
return true;
} else {
return false;
}
}
CompletableFuture<Boolean> activateUser(Long userId) {
return database.findUser(userId).thenCompose((u) -> {
if (u != null && !u.isActive()) {
return database.activateUser(userId)
.thenApply((r) -> true);
} else {
return CompletableFuture.completedFuture(false);
}
});
}
Future
Project Loom aims to drastically reduce the effort of writing, maintaining, and observing high-throughput concurrent applications that make the best use of available hardware.
Thread t = Thread.startVirtualThread(() -> { ... });
InputStream inputStream = null;
inputStream.read();
Writer writer = null;
writer.write("Hello, world");
Semaphore semaphore = null;
semaphore.acquire();
Thread.sleep(1000L);
CompletableFuture<Boolean> activateUser(Long userId) {
return database.findUser(userId).thenCompose((u) -> {
if (u != null && !u.isActive()) {
return database.activateUser(userId)
.thenApply((r) -> true);
} else {
return CompletableFuture.completedFuture(false);
}
});
}
boolean activateUser(Long userId) {
User user = database.findUser(userId);
if (user != null && !user.isActive()) {
database.activateUser(userId);
return true;
} else {
return false;
}
}
A virtual thread is still a thread.
One way to reduce concurrency bugs is avoiding direct thread usage.
Future
s give us sequential/parallel composition.
Using libraries: higher-level combinators
String sendHttpGet(String url) { ... }
String run(ExecutorService executor, long userId) {
AtomicReference<String> profileResult =
new AtomicReference<String>(null);
AtomicReference<String> friendsResult =
new AtomicReference<String>(null);
CountDownLatch done = new CountDownLatch(2);
Thread.startVirtualThread(() -> {
String result = sendHttpGet(
"http://profile_service/get/" + userId);
profileResult.set(result);
done.countDown();
});
Thread.startVirtualThread(() -> {
String result = sendHttpGet(
"http://friends_service/get/" + userId);
friendsResult.set(result);
done.countDown();
});
done.await();
return "Profile: " + profileResult.get() +
", friends: " + friendsResult.get();
}
CompletableFuture<String> sendHttpGet(String url) { ... }
CompletableFuture<String> run(long userId) {
CompletableFuture<String> profileResult =
sendHttpGet("http://profile_service/get/" + userId);
CompletableFuture<String> friendsResult =
sendHttpGet("http://friends_service/get/" + userId);
return profileResult.thenCompose(profile ->
friendsResult.thenApply(friends ->
"Profile: " + profile + ", friends: " + friends));
}
No style fits all use-cases
Future
s
String sendHttpGet(String url) { ... }
String run(long userId) {
CompletableFuture<String> profileResult = run(() ->
sendHttpGet("http://profile_service/get/" + userId);
);
CompletableFuture<String> friendsResult = run(() ->
sendHttpGet("http://friends_service/get/" + userId);
);
return "Profile: " + profileResult.get() +
", friends: " + friendsResult.get()));
}
String sendHttpGet(String url) { ... }
String run(long userId) {
runParallel(
() -> sendHttpGet("http://profile_service/get/" + userId),
() -> sendHttpGet("http://friends_service/get/" + userId),
(profile, friends) -> "Profile: " + profile +
", friends: " + friends
);
}
Loom gives us:
Loom doesn't give us:
We can represent a process as:
Future
sWhat kind of high-level operations might we expect?
retry
, runAll
, first
, delay
, ...
Often we operate on descriptions of computations, e.g. Callable<T>
.
ExecutorService
ExecutorService
IO
sA Future
is a value representing a running computation
(eager).
An IO
is a value representing a description of a
computation (lazy).
IO
offers above Callable
cancellation / interruption / uninterruptible regions
separate process definition from sequencing
explicit concurrency
equational reasoning
composability
Processes represented as code.
IO
s shine: orchestrationProcesses represented as a value.
We haven't discussed:
All of these will be impacted by Loom.
Loom solves some of the problems.
But doesn't eliminate the need of declarative concurrency.
Loom team:
Blogs which expand on what we've talked about today:
Structured concurrency:
Twitter threads: