What is a Backpressure in Reactive Programming
In Reactive Programming, we are dealing with Publishers and Subscribers. The Publisher publishes the data to the Subscriber. It is usually a data source. It can be a database, a remote service or anything which holds some data. In the Project Reactor, we have two Publishers: Mono and Flux.
Subscriber is some app that consumes data from the Publisher.
The interaction between these two types starts when the Subscriber subscribes to the Publisher. Then Publisher sends out a Subscription and starts sending data concurrently, as soon as it is available, in the form of a stream of events.
When the Publisher finishes with emitting data, it sends the onComplete signal to notify the Subscriber that there will be no more data. The whole process in detail is explained in this post: Introduction to Reactive Streams in Java.
Let’s see one example:
class ReactiveJavaTutorial { public static void main(String[] args) { Flux<String> cities = Flux.fromIterable( new ArrayList<>(Arrays.asList("New York", "London", "Paris", "Toronto", "Rome"))); cities.log().subscribe(); } }
Implementing the Backpressure in Project Reactor
In the example above, since we didn’t modify the Subscription, the default behavior is to request unbounded data from the Publisher, which means that the Subscriber is expecting the Publisher to emit all the values.
Now, let’s implement the Backpressure by telling the Publisher how much data to send.
For this, we will use the version of the subscribe() method that accepts a BaseSubscriber. This class lets the user to perform a request() and cancel() operations directly on it. Using it, we can override the default behavior.
class ReactiveProgrammingTutorial { public static void main(String[] args) { Flux<Integer> publisher = Flux.range(1, 100).log(); publisher.subscribe(new BaseSubscriber<>() { @Override protected void hookOnSubscribe(Subscription subscription) { request(5); // request only 5 elements } }); } }
Here, we override the hookOnSubcribe() method invoked upon a subscription. Inside, we call the request() method of the Subscription object and tell the Publisher to send only 5 values.
Let’s see the output:
INFO 7224 --- [main] reactor.Flux.Range.1 : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) INFO 7224 --- [main] reactor.Flux.Range.1 : | request(5) INFO 7224 --- [main] reactor.Flux.Range.1 : | onNext(1) INFO 7224 --- [main] reactor.Flux.Range.1 : | onNext(2) INFO 7224 --- [main] reactor.Flux.Range.1 : | onNext(3) INFO 7224 --- [main] reactor.Flux.Range.1 : | onNext(4) INFO 7224 --- [main] reactor.Flux.Range.1 : | onNext(5)
As you can see, Flux sent as much data as requested.
It is always good to cancel a Subscription after receiving the last requested element.mFor this, we will override the onNext() method invoked on every element emitted. Let’s extend the previous example:
class ReactiveJavaTutorial { public static void main(String[] args) { Flux<Integer> publisher = Flux.range(1, 100).log(); publisher.subscribe(new BaseSubscriber<Integer>() { @Override protected void hookOnSubscribe(Subscription subscription) { request(5); } @Override protected void hookOnNext(Integer value) { if(value == 5) { // we know that the last element is 5 cancel(); } } }); } }