In one of the previous lessons, you saw that we have two types of Publishers in Project Reactor: Mono and Flux.
Mono is a Publisher that can emit 0 or 1 items, and Flux can emit 0 . . . n elements.
Publishers emit data asynchronously, and based on the behaviour, we can divide them into Hot and Cold Publishers.
Let’s start with the Cold Publishers first.
Cold Publishers in Project Reactor
Cold Publisher will not start emitting data until a Subscriber subscribes to it. It creates a new data producer for each new subscription.
Let’s take youtube as an example. One user clicks on the video, and the streaming starts. After a while, another user clicks on the same video, and streaming starts from the beginning for him.
So it doesn’t matter that one user was already watching the video when the other joined. The other user will get a separate data producer.
Example
class ReactiveJavaTutorial { public static void main(String[] args) throws InterruptedException { Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo) .delayElements(Duration.ofSeconds(2)); // each part will play for 2 seconds // First Subscriber netFlux.subscribe(part -> System.out.println("Subscriber 1: " + part)); // wait 5 seconds before next Subscriber joins Thread.sleep(5000); // Seconds Subscriber netFlux.subscribe(part -> System.out.println("Subscriber 2: " + part)); Thread.sleep(60000); } private static Stream<String> getVideo() { System.out.println("Request for the video streaming received."); return Stream.of("part 1", "part 2", "part 3", "part 4", "part 5"); } }
Hot Publishers in Project Reactor
With Hot Publishers, there will be only one data producer. All Subscribers listen to the data produced by the single data producer. The data is shared.
Imagine a TV station. It does not matter if there is no one to watch the program. It will be emitted regardless. Watchers can start watching anytime they want. But all watchers get the same info at any given moment.
Watchers would lose the content if they joined late. The same is with the Hot Publishers.
Let’s transform the Cold Publisher from the above example into a Hot Publisher. We can do that by using the share() method. This method will effectively turn Publisher into a hot task when the first Subscriber subscribes. Then, further Subscribers will share the same Subscription, and therefore, the same result.
Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo) .delayElements(Duration.ofSeconds(2)) .share(); // turn the cold publisher into a hot publisher
And the output is:
Request for the video streaming received. Subscriber 1: part 1 Subscriber 1: part 2 Subscriber 1: part 3 Subscriber 2: part 3 Subscriber 1: part 4 Subscriber 2: part 4 Subscriber 1: part 5 Subscriber 2: part 5
You can see that the second Publisher lost the first two parts of the movie. But it was able to continue watching along with the first one.
Using the refCount() method
With the refCount() method, we can set how many Subscribers need to be subscribed for the Hot Publisher to start emitting data. Without this method, the share() will behave as refCount(1), requiring at least one Subscriber.
If we use the refCount() and pass the 1, the output will be the same as in the above example when we used the share() method. But what will happen if we pass 2, for example? Let’s try that in the next example:
Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo) .delayElements(Duration.ofSeconds(2)) .publish() .refCount(2); // minSubscribers
In this case, the output is:
Request for the video streaming received. Subscriber 1: part 1 Subscriber 2: part 1 Subscriber 1: part 2 Subscriber 2: part 2 Subscriber 1: part 3 Subscriber 2: part 3 Subscriber 1: part 4 Subscriber 2: part 4 Subscriber 1: part 5 Subscriber 2: part 5
Here, the Hot Publisher waited until the Second Publisher subscribed and then started emitting data for both Subscribers simultaneously.
Using the cache() method
With the cache() method, the data will be stored into a cache as soon as the Publisher finishes emitting. Then, the data will be replayed, extracted from the cache for any further Subscribers.
Let’s modify the previous example and add the cache() method:
Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo) .delayElements(Duration.ofSeconds(2)) .cache();
The output is:
Request for the video streaming received. Subscriber 1: part 1 Subscriber 1: part 2 Subscriber 1: part 3 Subscriber 1: part 4 Subscriber 1: part 5 Subscriber 2: part 1 Subscriber 2: part 2 Subscriber 2: part 3 Subscriber 2: part 4 Subscriber 2: part 5
There was only one request for video streaming. The second Subscriber got its data from the cache, there was no repeated emitting or similar.
If you do not want to cache all the items, you can use the cache(int history) – where the history is the number of elements retained in the cache.
That was all about Hot and Cold Publishers in Project Reactor. Proceed to the next lesson.
Happy coding!