In the previous lessons, you’ve learned how to create Flux and Mono and consume data by subscribing to one of these Publishers.
In this post, we will dive into some of the internal workings, and you will see how Mono and Flux work and what is happening when we subscribe to a Publisher and start receiving data from it.
How do Mono and Flux work?
Let’s create a simple Flux from a list.
Flux<String> cities = Flux.fromIterable( new ArrayList<>(Arrays.asList("New York", "London", "Paris", "Toronto", "Rome")));
Now, let’s subscribe to make the data flow.
cities.subscribe();
Ok, now Flux started emitting data to us, but we are not doing anything with it. That’s fine for now. Let’s add some logging to see what is happening behind the scene.
Here is the complete program:
class ReactiveJavaTutorial { public static void main(String[] args) { Flux<String> cities = Flux.fromIterable( new ArrayList<>(Arrays.asList("New York", "London", "Paris", "Toronto", "Rome"))); cities.log().subscribe(); } }
Output: INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | request(unbounded) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(New York) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(London) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(Paris) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(Toronto) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(Rome) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onComplete()
You can see from the above that when we subscribed to a Flux, it started emitting signals like onNext and onComplete. These are the part of the famous four types of signals that Publisher can emit. The other two are onSubscribe, which gets emitted as soon as we subscribe, and onError, which gets emitted if some exception happens that prevents Publisher from emitting the data.
Let’s see what is happening:
In the first log message, you see that the onSubscribe() method is invoked, and with that, the Publisher sends out the Subscription object. The second log shows the next step: Subscriber invokes the request() function of the Publisher requesting the Publisher to send the data. The Publisher starts to send data concurrently, as soon as it is available, in the form of a stream of events by invoking the onNext() function of the Subscriber interface.
Once all data is sent, the Publisher invokes the onComplete() method of the Subscriber and the whole process ends. In this example, we didn’t get the onError signal since no exception has occurred.
Let’s see one example where the onError() method gets executed by the Publisher:
class ReactiveJavaTutorial { public static void main(String[] args) { Mono<String> mono = Mono.fromSupplier(() -> { // let's throw an error here so that we can receive the onError signal from this Mono throw new RuntimeException("Mono failed to send the data!"); }); mono.subscribe( data -> System.out.println("Data received from the Mono: " + data), // onNext err -> System.out.println("Error received with message: " + err), // onError () -> System.out.println("Completed!") // onComplete ); } }
Output: Error received with message: java.lang.RuntimeException: Mono failed to send the data!
Here, we subscribed and provided three Consumers to catch every signal that Publisher sends. The Publisher was unable to send the data, so it sent an onError signal to notify the Subscriber, and within the onError signal, it sent an exception object.
That was all about how Mono and Flux work internally, proceed to the next lesson.
Happy coding!