In this lesson, we will cover the Reactor Execution Model and see how to switch threads from one to another in a reactive pipeline.
Reactor Execution Model
You’ve already learned that Reactive Programming is asynchronous and non-blocking. That means that there will be no blocking threads.
Let’s see one example:
class ReactiveJavaTutorial { public static void main(String[] args) { Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam") .map(String::toUpperCase) .filter(cityName -> cityName.length() <= 8) .map(cityName -> cityName.concat(" City")) .log(); cities.subscribe(); } }
Here, we performed a few operators on a Flux and we subscribed to start consuming data using the subscribe() method.
Let’s see the output:
INFO 14040 ---
[main]
reactor.Flux.MapFuseable.1 : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) INFO 14040 ---
[main]
reactor.Flux.MapFuseable.1 : | request(unbounded) INFO 14040 ---
[main]
reactor.Flux.MapFuseable.1 : | onNext(NEW YORK City) INFO 14040 ---
[main]
reactor.Flux.MapFuseable.1 : | onNext(LONDON City) INFO 14040 ---
[main]
reactor.Flux.MapFuseable.1 : | onNext(PARIS City) INFO 14040 ---
[main]
reactor.Flux.MapFuseable.1 : | onComplete()
Look at the bold part between the [ ] brackets. It says main, which means that the main thread operated. And as you can see, the whole pipeline is executed by the same thread.
That is the default behavior of the Reactor Execution Model.
The same thread that performs a subscription will be used for the whole pipeline execution.
That happened in our example also. The whole pipeline was executed by the same thread that started the subscription, the main thread.
Note: If you get a Test worker thread instead of the main thread, that is also fine.
How to use multiple threads in Project Reactor?
If we know that some operations we want to perform on a Flux or Mono can be time-consuming, we probably don’t want to block the thread that started the execution. For this purpose, we can instruct the Reactor to use a different Scheduler.
In the Project Reactor we have Schedulers, a factory class that can be used to switch the threads in the reactive pipeline execution.
We have different Scheduler options that we can use:
- Schedulers.parallel() – It has a fixed pool of workers. The number of threads is equivalent to the number of CPU cores.
- Schedulers.boundElastic() – It has a bounded elastic thread pool of workers. The number of threads can grow based on the need. The number of threads can be much bigger than the number of CPU cores.
Used mainly for making blocking IO calls. - Schedulers.single() – Reuses the same thread for all callers.
We can instruct the Reactor to use the desired Scheduler using one of the following methods:
- The publishOn method
- The subscribeOn method
We will explore both methods in the next lesson.
Happy coding!