To get the data from the Publisher (Flux), we need to subscribe to it. In this post, you will learn to subscribe to a Flux in Java Reactor.
How to Subscribe to Flux in Java?
When we subscribe to a Publisher (Flux), it starts emitting signals like:
- onNext
When the Publisher receives a request from the Consumer, it starts emitting data in the form of a stream of events by invoking the onNext() method of the Subscriber interface. - onError
In case of an error, the exception will be sent in the form of an event to the Subscriber.
Using the onError() method. - onComplete
When the Publisher finishes with sending data, it will notify the Consumer via the onComplete() method.
Take a look at Introduction to Reactive Streams for more on Stream events.
To make the data flow, you have to subscribe to Flux using one of the subscribe() methods.
Example
class ReactiveJavaTutorial { public static void main(String[] args) throws InterruptedException { // Create a Flux Flux<String> flux = Flux.just("data1", "data2", "data3"); // Subscribe to a Flux and print the elements flux.subscribe(System.out::println); } }
Output: data1 data2 data3
When we call the subscribe(), we are telling the Publisher to start sending data.
We can also create functions that will be executed for each of the signals that Publisher sends when we subscribe.
Example
class ReactiveJavaTutorial { public static void main(String[] args) throws InterruptedException { Flux<String> flux = Flux.just("data1", "data2", "data3"); flux.subscribe( data -> onNext(data), // onNext err -> onError(err), // onError () -> onComplete() // onComplete ); } private static <T> void onNext(T data) { System.out.println("onNext: Data received: " + data); } private static <T> void onError(Throwable err) { System.out.println("onError: Exception occurred: " + err.getMessage()); } private static <T> void onComplete() { System.out.println("onComplete: Completed!"); } }
Output: onNext: Data received: data1 onNext: Data received: data2 onNext: Data received: data3 onComplete: Completed!
If an error occurs that prevents the Publisher from publishing the data, the onError signal will be emitted containing the exception object.
That’s it!
Is this a synchronous operation?