To consume from the Reactive Stream, in this case, a Mono, we need to subscribe to it. In this post, you will learn how to subscribe to a Mono in Java Reactor.
How to Subscribe to a Mono?
When we subscribe to a Publisher(Mono), it starts emitting signals like:
- onNext
When the Publisher receives a request from the Consumer, it starts emitting data in the form of a stream of events by invoking the onNext() method of the Subscriber interface. - onError
In case of an error, the exception will be sent in the form of an event to the Subscriber.
Using the onError() method. - onComplete
When the Publisher finishes with sending data, it will notify the Consumer via the onComplete() method.
Take a look at Introduction to Reactive Streams for more on Stream events.
Nothing will happen with the Publisher if we don’t start consuming it.
When we call one of the overloaded subscribe() methods from the Subscriber interface, we are requesting the Publisher to start emitting data.
Let’s see an example:
import reactor.core.publisher.Mono; class ReactiveJavaTutorial { public static void main(String[] args) { // create a Mono Mono<String> mono = Mono.just("Hello"); // subscribe to a Mono mono.subscribe(); } }
Here, we started consuming the data, but we are not doing anything with it.
Let’s print what we get from Mono:
import reactor.core.publisher.Mono; class ReactiveJavaTutorial { public static void main(String[] args) { // create a Mono Mono<String> mono = Mono.just("Hello"); // subscribe to a Mono mono.subscribe(data -> System.out.println(data)); } }
Output: Hello
This was one simple example of how to subscribe to a Mono using the subscribe(Consumer<? super T> consumer) method that subscribes a Consumer to the Mono.
Let’s try something else. Let’s subscribe and define what should be triggered for each of the 3 events.
Let’s try something else. Let’s subscribe and define what should be triggered for each of the 3 events.
import reactor.core.publisher.Mono; class ReactiveJavaTutorial { public static void main(String[] args) { // create a Mono Mono<String> mono = Mono.just("Hello"); // subscribe to a Mono mono.subscribe( data -> System.out.println(data), // onNext err -> System.out.println(err), // onError () -> System.out.println("Completed!") // onComplete ); } }
Output: Hello Completed!
We used here the subscribe(Consumer<? super T> consumer, Consumer <? super Throwable> errorConsumer, Runnable completeConsumer) method that subscribes a Consumer to this Mono that will respectively consume all the elements in the sequence, handle errors and react to completion.
Now, let’s invoke the onError signal:
import reactor.core.publisher.Mono; class ReactiveJavaTutorial { public static void main(String[] args) { // create a Mono Mono<String> mono = Mono.fromSupplier(() -> { throw new RuntimeException("Exception occurred!"); }); // subscribe to a Mono mono.subscribe( data -> System.out.println(data), // onNext err -> System.out.println("ERROR: " + err), // onError () -> System.out.println("Completed!") // onComplete ); } }
Output: ERROR: java.lang.RuntimeException: Exception occurred!
That was all about how to subscribe to a Mono in Java Reactor. Proceed to the next lesson.
Happy coding!