```text Loading ... ``` --- ## Loom ∈ Java 21+ ```scala [|2,7] timed { val threads = new Array[Thread](10_000_000) val results = ConcurrentHashMap.newKeySet[Int]() for (i <- threads.indices) threads(i) = Thread .ofVirtual() .start(() => results.add(0).discard); threads.foreach(_.join()) } ``` ``` ~2 seconds ``` --- ## Akka streams ```scala [|10|11|7-8,12|13|4-5,14|15] implicit val system: ActorSystem = ActorSystem("demo") implicit val ec: ExecutionContext = system.dispatcher val nats = Source.unfold(0): i => Some((i + 1, i + 1)) def delayedCompute(i: Int): Future[Int] = after(5.seconds, using = system.scheduler)(Future(i * 3)) val done: Future[Done] = Source.range(1, 100) .throttle(1, 1.second) .mapAsync(4)(i => delayedCompute(i).map(_ + 1)) .filter(_ % 2 == 0) .zip(nats) .runForeach(println) ``` --- ## The problem Can we combine the developer experience: * of virtual threads * of "reactive" streaming libraries while keeping the performance? --- ## From Reactive Streams to Virtual Threads ### [Adam Warski](https://warski.org), [SoftwareMill](https://softwaremill.com) YavaConf, December 2025 --- ## Who am I? * co-founder of [SoftwareMill](https://softwaremill.com), R&D * software engineer: distributed systems, functional programming * [blogger](https://warski.org/articles/) * technologies: Java, Scala, Kafka, messaging, event sourcing * [OSS](https://warski.org/projects/): sttp client, Tapir, Hibernate Envers, Jox & more ---
Part I: Virtual Threads
--- ## Some history * part of Project Loom * 2017: start * 2023: stable release in Java 21 (LTS) * 2025: final restrictions lifted in Java 24/25 --- > Virtual threads are lightweight threads that reduce the effort of writing, maintaining, and debugging high-throughput concurrent applications. from [Oracle's website](https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html) --- ## But why? * "traditional" threads are "heavy" * Java used to have a synchronous model * but, the web happened --- ## Before Loom: Scaling threads Core idea: make sure threads are never idle
* thread pools * `Executor` * `Future`, `CompletableFuture`
--- ## In the old days ```scala val person = db.findById(id) if person.hasLicense() then bankingService.transferFunds(person, dealership, amount) dealerService.reserveCar(person) ``` --- ## More recently ```scala db.findById(id).flatMap: person => if person.hasLicense() then bankingService .transferFunds(person, dealership, amount) .flatMap: transferResult => dealerService.reserveCar(person) else Future.successful(()) ``` technical concerns > code readability --- ## Goals of Virtual Threads * maintain thread utilization * retain simplicity of synchronous code --- ## Goals of Virtual Threads * reintroduce **direct syntax** * eliminate **virality** * eliminate **lost context** --- #### Under the covers: Futures / IOs
--- #### Under the covers: Virtual Threads
--- ## In the new days ```scala val person = db.findById(id) if person.hasLicense() then bankingService.transferFunds(person, dealership, amount) dealerService.reserveCar(person) ``` --- ## Loom contributions * runtime: scheduler * API: virtual & platform threads * retrofitting blocking APIs Blocking is once again completely fine! ---
Part II: Reactive Streams
--- ## Reactive Streams > govern the exchange of stream data across an asynchronous boundary > provide a standard for asynchronous stream processing with non-blocking back pressure from [reactive-streams.org](https://www.reactive-streams.org) --- ## Reactive Streams Process streaming data: * efficiently utilize threads * declarative concurrency * interface with I/O operations * safely handle errors --- ## Reactive Streams: bounded memory
--- ## Akka/Pekko streams ```scala implicit val system: ActorSystem = ActorSystem("demo") implicit val ec: ExecutionContext = system.dispatcher val nats = Source.unfold(0): i => Some((i + 1, i + 1)) def delayedCompute(i: Int): Future[Int] = after(5.seconds, using = system.scheduler)(Future(i * 3)) val done: Future[Done] = Source.range(1, 100) .throttle(1, 1.second) .mapAsync(4)(i => delayedCompute(i).map(_ + 1)) .filter(_ % 2 == 0) .zip(nats) .runForeach(println) ``` --- ## How did we get here? * working with `Future`s is complex * high-level, FP-inspired API is better --- ## Reactive Streams & implementations * `Publisher` / `Subscriber`: standard * Akka/Pekko Streams, fs2, ZIO streams, RxJava, Reactor: implementations --- ## But ... * `Future`s/`IO`s are viral * "lifted" syntax: overhead * lost context in case of errors ---
Part III: Implementing simple streams
--- We'll take inspiration from:
--- ## Ox: Channels & Flows
--- ## Assumptions * built-in control flow * `for`, `if`, `try-catch-finally` * composing operations using the virtual `;` * not `.flatMap` / `.map` --- ## Stages & emits ```scala trait FlowStage[+T]: def run(emit: FlowEmit[T]): Unit trait FlowEmit[-T]: def apply(t: T): Unit ``` --- ## Top-level flow ```scala class Flow[+T](val last: FlowStage[T]) extends FlowOps[T] ``` --- ## Example infinite stream ```scala[|1|2|4-7] def iterate[T](zero: T)(f: T => T): Flow[T] = Flow(new FlowStage[T]: override def run(emit: FlowEmit[T]): Unit = var t = zero forever: emit(t) t = f(t) ) ``` --- ## Example consumer ```scala class Flow[+T](protected val last: FlowStage[T]): // ... def runToList(): List[T] = val b = List.newBuilder[T] last.run(new FlowEmit[T]: def apply(t: T): Unit = b += t ) b.result() ``` --- ## Example transformation ```scala[|4|6|7] class Flow[+T](protected val last: FlowStage[T]): // ... def map[U](f: T => U): Flow[U] = Flow(new FlowStage[T]: override def run(emit: FlowEmit[T]): Unit = last.run(new FlowEmit[T]: def apply(t: T): Unit = emit(f(t)) ) ) ``` --- ## Single-thread pipeline ```scala val result: List[Int] = Flows .iterate(1, _ + 1) .map(_ * 2) .filter(_ % 3 == 2) .take(10) .runToList() ``` ---
--- ## Java Streams? ```java List
result = Stream .iterate(1, i -> i + 1) .map(i -> i * 2) .filter(i -> i % 3 == 2) .limit(10) .collect(Collectors.toList()); ``` ---
Part IV: Asynchronous Infrastructure
--- ## Communicating threads * queues? * channels! --- ## Channels * queue interface * completable * Go-like select ---
--- ```scala val ch = Channel.buffered[Int](4) ch.send(1) ch.send(2) println(ch.receive()) ``` --- ```scala [|7] val ch1 = Channel.bufferedDefault[String] val ch2 = Channel.bufferedDefault[String] Thread.ofVirtual().start(() => ch1.send("v1")) Thread.ofVirtual().start(() => ch2.send("v2")) println(select(ch1.receiveClause(), ch2.receiveClause())) ``` --- ```scala val ch = Channel.buffered[String](4) ch.send("hello") ch.done() println("Received: " + ch.receiveOrClosed()) println("Received: " + ch.receiveOrClosed()) ``` ```text Received: hello Received: ChannelDone[] ``` --- ## Managing threads * using structured concurrency * built on top of [JEP-505](https://openjdk.org/jeps/505) * in preview --- ```scala[|1|2,6|10] val result = supervised: val f1 = scope.fork: sleep(500.millis) 5 val f2 = scope.fork: sleep(1000.millis) 6 f1.join() + f2.join() ``` --- ```scala[|4|11] val result = supervised: val f1 = scope.fork: sleep(500.millis) throw new RuntimeException("Giving up!"); val f2 = scope.fork: sleep(1000.millis) 6 f1.join() + f2.join() () ``` --- ## Let it crash! --- ```scala[|6,12] val result = supervised: val result = Promise[String]() val f1 = scope.fork: sleep(500.millis) result.complete(Success("Number 1 won!")) val f2 = scope.fork: sleep(1000.millis) result.complete(Success("Number 2 won!")) result.future.get() ``` --- ## Structured concurrency Syntactical structure of the code determines the lifetime of threads Threading becomes an implementation detail! ---
Part V: Asynchronous Streams
--- ```scala def merge[U >: T](other: Flow[U]): Flow[U] = Flow((emit: FlowEmit[U]) => unsupervised: val c1 = outer.runToChannel() val c2 = other.runToChannel() repeatWhile: selectOrClosed(c1, c2) match case ChannelClosed.Done => if c1.isClosedForReceive then channelToEmit(c2, emit) else channelToEmit(c1, emit) false case e: ChannelClosed.Error => throw e.toThrowable case r: U @unchecked => emit(r); true) ``` ---
--- ```scala [2|3|4-5|8|15] def merge[U >: T](other: Flow[U]): Flow[U] = Flow((emit: FlowEmit[U]) => unsupervised: val c1 = outer.runToChannel() val c2 = other.runToChannel() repeatWhile: selectOrClosed(c1, c2) match case ChannelClosed.Done => if c1.isClosedForReceive then channelToEmit(c2, emit) else channelToEmit(c1, emit) false case e: ChannelClosed.Error => throw e.toThrowable case r: U @unchecked => emit(r); true) ``` --- ## Other operators * `mapPar` * `zip` * `buffer` * `grouped` * ... --- ## Pekko/Akka streams ```scala implicit val system: ActorSystem = ActorSystem("demo") implicit val ec: ExecutionContext = system.dispatcher val nats = Source.unfold(0): i => Some((i + 1, i + 1)) def delayedCompute(i: Int): Future[Int] = after(5.seconds, using = system.scheduler)(Future(i * 3)) val done: Future[Done] = Source.range(1, 100) .throttle(1, 1.second) .mapAsync(4)(i => delayedCompute(i).map(_ + 1)) .filter(_ % 2 == 0) .zip(nats) .runForeach(println) ``` --- ## Ox/Jox Flows ```scala [|4|6|7-9|1-2,12|13] val nats = Flow.unfold(0)(i => Some((i + 1, i + 1))) Flow .range(1, 100, 1) .throttle(1, 1.second) .mapPar(4) { i => sleep(5.seconds) val j = i * 3 j + 1 } .filter(_ % 2 == 0) .zip(nats) .runForeach(println) ``` --- ## Backpressure? * limited buffers * thread blocking --- ## Error handling? * sync: just throw exceptions * `try-catch-finally` * `.onError`, `.onComplete` for convenience * async: structured concurrency * threading == implementation detail --- ```scala[|2,5|4,7] def runToChannel()(using OxUnsupervised): Source[T] = val ch = Channel.bufferedDefault[T] forkUnsupervised(ch) { try last.run((t: T) => ch.send(t)) ch.done() catch case e: Throwable => ch.error(e) }.discard ch ``` --- ```scala [3,14] def merge[U >: T](other: Flow[U]): Flow[U] = Flow((emit: FlowEmit[U]) => unsupervised: val c1 = outer.runToChannel() val c2 = other.runToChannel() repeatWhile: selectOrClosed(c1, c2) match case ChannelClosed.Done => if c1.isClosedForReceive then channelToEmit(c2, emit) else channelToEmit(c1, emit) false case e: ChannelClosed.Error => throw e.toThrowable case r: U @unchecked => emit(r); true) ``` --- ## Performance? ---
---
--- ## Java Streams?
java streams
ox flows
model
synchronous
synchronous
asynchronous
extensibility
gatherers
custom
FlowStage
purpose
collections
for-each
time-based
buffering
merging
event-driven/RT
Both: lazy, backpressured ---
Part VI: In Summary
--- ## Virtual Threads * since Java 21, improved in Java 25 * direct syntax: again * keeping the performance --- ## Structured Concurrency * code structure -> lifetime of threads * no "thread leaks" * threading: an implementation detail --- ## Ox Flows * powerful, familiar API * direct control flow * no `Future` / `IO` virality * lazy evaluation model * sync & async --- ## Reactive Streams * interoperability! * but: programmer-friendly APIs also possible on top of VTs * RS-based libraries: still have an edge * maturity * integrations --- ### Virtual Threads: the ultimate validation of Reactive Streams! --- ## What's next? Integrations, integrations, integrations! ---
### Solving the hard problems that our clients face, using software ---
--- ## What's available now * Java 21 w/ virtual threads * [Ox](https://github.com/softwaremill/ox): channels, flows, concurrency scopes * [Jox](https://github.com/softwaremill/jox): java version * Structured Concurrency, [JEP 505 (preview)](https://openjdk.org/jeps/505) ```java val starOnGitHub = true ``` --- ```java val thankYou = true ``` [https://warski.org](https://warski.org)
[X: @adamwarski](https://x.com/adamwarski) | [BlueSky: warski.org](https://bsky.app/profile/warski.org) | [LinkedIn](https://www.linkedin.com/in/adamwarski/)