Project Reactor is an open-source reactive library for building reactive applications in Java. It is part of the broader Reactive Streams initiative and provides an implementation of the Reactive Streams specification. Developed by Pivotal Software, Project Reactor has gained widespread adoption in the Java ecosystem due to its powerful abstractions and ease of use.
At its core, Project Reactor introduces two main types: Flux
and Mono
. Flux
represents a stream of zero or more elements, while Mono
represents a stream that can emit at most one element or an error signal. These types offer a simple and expressive way to work with reactive data streams.
Getting Started with Project Reactor
To begin working with Project Reactor in your Java project, you need to set up the necessary dependencies. Project Reactor can be easily integrated into your project using build automation tools like Maven or Gradle.
Maven Installation
If you’re using Maven, you can add the Project Reactor dependency by adding the following lines to your pom.xml
:
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.4.10</version> </dependency> </dependencies>
Make sure to check for the latest version of Project Reactor and update the version number accordingly.
Gradle Installation
For Gradle projects, include the following line in your build.gradle
file:
dependencies { implementation 'io.projectreactor:reactor-core:3.4.10' }
Again, ensure the version number matches the latest available.
Understanding Reactor Core Concepts
In Project Reactor, Flux
and Mono
are the fundamental building blocks of reactive streams. They provide a powerful way to represent and process sequences of data in a reactive and non-blocking manner.
Flux
Flux
represents a stream of zero or more elements that can be emitted over time. It is analogous to a collection that can be iterated upon asynchronously. Flux is useful for handling multiple values and is commonly used when dealing with sequences of data from sources like databases, APIs, or other streams.
Let’s see a code example of creating a simple Flux
that emits a sequence of strings:
import reactor.core.publisher.Flux; public class FluxDemo { public static void main(String[] args) { Flux<String> stringFlux = Flux.just("apple", "banana", "orange"); stringFlux.subscribe( item -> System.out.println("Received: " + item), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
Explanation:
- The code starts by importing the required classes, including
Flux
from thereactor.core.publisher
package. - A
Flux
calledstringFlux
is created using theFlux.just()
method. It emits three elements: “apple”, “banana”, and “orange”. This Flux is not yet active; it will start emitting elements once a subscriber subscribes to it. - The
subscribe()
method is called on thestringFlux
, which initiates the subscription. This means the Flux will start emitting elements, and subscribers can consume those elements. - The first lambda expression inside
subscribe()
is theonNext()
consumer. It handles each element emitted by the Flux. In this case, it prints “Received: ” followed by the item received, resulting in output like “Received: apple”, “Received: banana”, and “Received: orange”. - The second lambda expression inside
subscribe()
is theonError()
consumer. It handles any errors that might occur during the Flux processing. If an error occurs, it will print “Error: ” followed by the error message. However, in this example, we did not provide any error handling, so any error would be printed to the standard error stream. - The third lambda expression inside
subscribe()
is theonComplete()
consumer. It gets called when the Flux has completed emitting all its elements successfully. In this case, it prints “Flux completed.” to indicate that the Flux processing has finished successfully.
When you run the code, you’ll see the output:
Received: apple Received: banana Received: orange Flux completed.
This demonstrates how to create a simple Flux and subscribe to it to consume the emitted elements, handle errors, and react to the completion of the Flux.
Mono
On the other hand, Mono
represents a stream that emits exactly zero or one element. It can be seen as a container for a single value or no value at all. Monos are useful for representing the result of a computation that might complete with a value or an error.
Let’s look at an example of creating a Mono
that emits a single integer value:
import reactor.core.publisher.Mono; public class MonoDemo { public static void main(String[] args) { Mono<Integer> monoValue = Mono.just(42); monoValue.subscribe( item -> System.out.println("Received: " + item), error -> System.err.println("Error: " + error), () -> System.out.println("Mono completed.") ); } }
Explanation:
- The code imports the necessary class
Mono
from thereactor.core.publisher
package.Mono
is a type in Project Reactor that represents a stream that emits exactly zero or one element. - A new instance of
Mono<Integer>
calledmonoValue
is created using theMono.just()
factory method. - The
Mono.just()
method is used to create aMono
that emits a single element, in this case, the integer42
. - The
subscribe()
method is called on themonoValue
instance. Thesubscribe()
method allows us to attach consumers (also known as subscribers) to theMono
to handle the emitted elements and other events. - The
subscribe()
method takes three arguments, each represented by a lambda expression:- The first lambda expression
item -> System.out.println("Received: " + item)
is the consumer that handles the elements emitted by theMono
. When theMono
emits the value42
, the lambda expression will be executed, and it will print “Received: 42” to the console. - The second lambda expression
error -> System.err.println("Error: " + error)
handles any errors that might occur during the processing of theMono
. In this example, since theMono
is created usingMono.just(42)
, which emits a single value without errors, this lambda won’t be invoked. - The third lambda expression
() -> System.out.println("Mono completed.")
is the callback for handling the completion of theMono
. When theMono
completes successfully without errors, this lambda will be executed, and it will print “Mono completed.” to the console.
- The first lambda expression
As a result, when you run the code, the output will be:
Received: 42 Mono completed.
This demonstrates the use of Project Reactor’s Mono
to create and handle a reactive stream that emits exactly one element.
Understanding hot and cold publishers
In Project Reactor, publishers can be categorized into two types: hot and cold.
- Cold Publishers: A
Flux
orMono
is considered a cold publisher when it starts emitting data only after a subscriber subscribes to it. Each subscriber receives the whole sequence of data independently. This means that each subscriber gets its copy of the data stream, and their consumption does not interfere with each other. - Hot Publishers: Conversely, hot publishers emit data regardless of whether there are subscribers or not. Subscribers can join the stream at any point and receive only the data emitted after they subscribed. Multiple subscribers to a hot publisher share the same data stream, which means they may receive the same data at the same time.
Hot publishers are often used in scenarios where data is being constantly generated, and new subscribers want to receive real-time updates.
Operators: Transformation and processing of data streams
Project Reactor provides a wide range of operators that allow you to transform and process data streams in a declarative manner. These operators help you manipulate the data emitted by Flux
and Mono
, apply transformations, and handle errors efficiently.
Some commonly used operators include:
map
: Transforms each item emitted by the publisher.filter
: Filters the items based on a predicate.flatMap
: Transforms each item into a new reactive sequence and flattens it.zip
: Combines multiple publishers and emits a tuple of their values.
Let’s see an example of using map
and filter
operators on a Flux
:
import reactor.core.publisher.Flux; public class OperatorsDemo { public static void main(String[] args) { Flux<Integer> numbersFlux = Flux.range(1, 10) .map(number -> number * 2) // Transform each item by doubling it .filter(number -> number % 3 == 0); // Filter only the items divisible by 3 numbersFlux.subscribe( item -> System.out.println("Received: " + item), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
Explanation:
- The code imports the
Flux
class from thereactor.core.publisher
package. This class is a part of Project Reactor and represents a reactive stream that can emit multiple items over time. - Inside the
main
method, a newFlux<Integer>
callednumbersFlux
is created. It is initialized using theFlux.range()
method, which generates a sequence of integers from 1 to 10 (inclusive). - The
map
operator is applied to thenumbersFlux
. Themap
operator allows transforming each item emitted by the publisher. In this case, a lambda expression is used to double each number in the sequence. - The
filter
operator is then applied to thenumbersFlux
. Thefilter
operator allows filtering the items based on a predicate. Here, the lambda expression checks if the number is divisible by 3 (i.e.,number % 3 == 0
), and only the items that satisfy this condition will be included in the resulting sequence. - After applying the
map
andfilter
operators,numbersFlux
represents a new reactive stream that contains only the doubled numbers that are divisible by 3. - The
subscribe
method is called onnumbersFlux
to subscribe to the reactive stream and start consuming the data. - Inside the
subscribe
method, three lambda expressions are provided:- The first lambda expression is responsible for handling each item emitted by the
Flux
. It simply prints the received item with the prefix “Received: “. - The second lambda expression handles errors if they occur during the processing of the
Flux
. It prints the error message to the standard error stream usingSystem.err.println
. - The third lambda expression is called when the
Flux
completes successfully. It prints “Flux completed.” to indicate that the data stream has been fully processed.
- The first lambda expression is responsible for handling each item emitted by the
- When the program runs, the
Flux
starts emitting the transformed and filtered values. In this case, it emits the numbers 6, 12, and 18, which are the results of doubling the numbers 3, 6 and 9, and filtering out the numbers that are not divisible by 3.
Finally, the program prints the following output:
Received: 6 Received: 12 Received: 18 Flux completed.
The output demonstrates that the map
and filter
operators were successfully applied to the Flux
, transforming and filtering the data stream according to the specified operations.
This code showcases how to use the map
and filter
operators in Project Reactor to transform and process data streams in a reactive and non-blocking manner. It’s a powerful way to handle data manipulation and filtering in a declarative style, making reactive programming in Java more efficient and expressive.
Creating and Working with Flux
Project Reactor provides several methods to create a Flux
from various sources, such as arrays, collections, or individual elements. Let’s explore some of the common ways to create a Flux
:
Flux.fromIterable()
You can create a Flux
from an existing iterable, such as a List
:
import reactor.core.publisher.Flux; public class FluxCreationDemo { public static void main(String[] args) { List<String> fruits = Arrays.asList("Apple", "Banana", "Orange"); Flux<String> fruitFlux = Flux.fromIterable(fruits); fruitFlux.subscribe( fruit -> System.out.println("Received: " + fruit), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
Output:
Received: Apple Received: Banana Received: Orange Flux completed.
Flux.fromArray()
You can create a Flux
from an array of elements:
import reactor.core.publisher.Flux; public class FluxCreationDemo { public static void main(String[] args) { String[] colors = {"Red", "Green", "Blue"}; Flux<String> colorFlux = Flux.fromArray(colors); colorFlux.subscribe( color -> System.out.println("Received: " + color), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
Output:
Received: Red Received: Green Received: Blue Flux completed.
Flux.just()
You can create a Flux
with individual elements using the just()
method:
import reactor.core.publisher.Flux; public class FluxCreationDemo { public static void main(String[] args) { Flux<String> animalFlux = Flux.just("Cat", "Dog", "Elephant"); animalFlux.subscribe( animal -> System.out.println("Received: " + animal), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
Output:
Received: Cat Received: Dog Received: Elephant Flux completed.
Applying Operators on Flux to Modify and Filter Data
Project Reactor provides a wide range of operators that allow you to transform and filter the data emitted by a Flux
. Let’s explore some commonly used operators:
- map(): The
map()
operator allows you to transform each element emitted by theFlux
into a new form:import reactor.core.publisher.Flux; public class FluxOperatorsDemo { public static void main(String[] args) { Flux<Integer> numberFlux = Flux.range(1, 5) .map(number -> number * 2); numberFlux.subscribe( result -> System.out.println("Received: " + result), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
Output:
Received: 2 Received: 4 Received: 6 Received: 8 Received: 10 Flux completed.
- filter(): The
filter()
operator allows you to selectively emit only those elements that meet a certain condition:import reactor.core.publisher.Flux; public class FluxOperatorsDemo { public static void main(String[] args) { Flux<Integer> numberFlux = Flux.range(1, 10) .filter(number -> number % 2 == 0); numberFlux.subscribe( evenNumber -> System.out.println("Received: " + evenNumber), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
Output:
Received: 2 Received: 4 Received: 6 Received: 8 Received: 10 Flux completed.
Combining Flux Streams Using Various Operators
Project Reactor provides a wide range of operators that allow you to transform and filter the data emitted by a Flux
. Let’s explore some commonly used operators:
- concat(): The
concat()
operator combines two or moreFlux
streams sequentially, maintaining the order of elements:import reactor.core.publisher.Flux; public class FluxOperatorsDemo { public static void main(String[] args) { Flux<Integer> flux1 = Flux.range(1, 3); Flux<Integer> flux2 = Flux.range(4, 3); Flux<Integer> combinedFlux = Flux.concat(flux1, flux2); combinedFlux.subscribe( number -> System.out.println("Received: " + number), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
Output:
Received: 1 Received: 2 Received: 3 Received: 4 Received: 5 Received: 6 Flux completed.
- merge(): The
merge()
operator combines two or moreFlux
streams concurrently, emitting elements as they arrive:import reactor.core.publisher.Flux; public class FluxOperatorsDemo { public static void main(String[] args) { Flux<String> letters1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100)); Flux<String> letters2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(150)); Flux<String> mergedFlux = Flux.merge(letters1, letters2); mergedFlux.subscribe( letter -> System.out.println("Received: " + letter), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); // Wait for the Flux to complete try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }
Output (order may vary due to concurrency):
Received: A Received: X Received: B Received: C Received: Y Received: Z Flux completed.
Working with Mono
In Project Reactor, you can create a Mono
from individual elements, a single nullable value, or no value at all. Let’s explore different ways to create Mono
instances:
Creating a Mono with a Single Element
To create a Mono
that emits a single element, you can use the Mono.just()
method:
import reactor.core.publisher.Mono; public class MonoDemo { public static void main(String[] args) { Mono<String> mono = Mono.just("Hello, Mono!"); mono.subscribe(System.out::println); } }
Output:
Hello, Mono!
Creating a Mono with a Nullable Value
To create a Mono
that may emit a nullable value, you can use the Mono.justOrEmpty()
method. If the provided value is null
, it will create an empty Mono
.
import reactor.core.publisher.Mono; public class MonoDemo { public static void main(String[] args) { String message = null; Mono<String> mono = Mono.justOrEmpty(message); mono.subscribe( value -> System.out.println("Value: " + value), () -> System.out.println("Mono is empty.") ); } }
Output:
Mono is empty.
Applying Operators on Mono to Handle Success and Error Cases
Project Reactor provides numerous operators to manipulate Mono
and handle various scenarios. Let’s see some commonly used operators:
map()
: Themap()
operator allows you to apply a function to the value emitted by theMono
, transforming it into another value.import reactor.core.publisher.Mono; public class MonoDemo { public static void main(String[] args) { Mono<String> mono = Mono.just("Hello"); Mono<Integer> mappedMono = mono.map(s -> s.length()); mappedMono.subscribe(length -> System.out.println("Length: " + length)); } }
Output:
Length: 5
onErrorResume()
: TheonErrorResume()
operator helps handle errors by providing a fallback value orMono
in case of an error.import reactor.core.publisher.Mono; public class MonoDemo { public static void main(String[] args) { Mono<String> mono = Mono.error(new RuntimeException("Oops! Something went wrong.")); Mono<String> fallbackMono = mono.onErrorResume(error -> Mono.just("Fallback Value")); fallbackMono.subscribe( value -> System.out.println("Value: " + value), error -> System.out.println("Error handled: " + error.getMessage()) ); } }
Output:
Value: Fallback Value
Combining Mono with Flux and Vice Versa
Project Reactor allows you to combine Mono
and Flux
instances to create more complex reactive sequences. Here are some examples:
- Combining Mono and Flux using
zip()
: Thezip()
operator combines the latest elements from aMono
and aFlux
into a new object.import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class MonoFluxDemo { public static void main(String[] args) { Mono<String> mono = Mono.just("Hello"); Flux<Integer> flux = Flux.range(1, 3); Flux<String> combinedFlux = mono.zipWith(flux, (m, f) -> m + " " + f); combinedFlux.subscribe(System.out::println); } }
Output:
Hello 1 Hello 2 Hello 3
- Combining Flux and Mono using
flatMap()
: TheflatMap()
operator allows you to perform asynchronous operations with aFlux
and return aMono
for each element.import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class FluxMonoDemo { public static void main(String[] args) { Flux<Integer> flux = Flux.range(1, 3); flux.flatMap(num -> Mono.just(num * num)) .subscribe(result -> System.out.println("Result: " + result)); } }
Output:
Result: 1 Result: 4 Result: 9
Managing Backpressure
In reactive programming, backpressure is a crucial concept that deals with the situation when the data producer (publisher) emits data at a faster rate than the data consumer (subscriber) can handle. If left unmanaged, this imbalance can lead to resource exhaustion and application instability. Backpressure is essential to ensure that the data flow remains controlled and the system can handle varying workloads.
In Project Reactor, backpressure is built into the Flux
and Mono
types to handle data flow efficiently. When a subscriber is slower than the publisher, it can signal the publisher to slow down the emission of data. This way, the subscriber can process data at its own pace, and no data is lost.
Dealing with Backpressure Using Operators
Project Reactor provides several operators that can be used to manage backpressure effectively. Let’s explore some of the commonly used operators:
onBackpressureBuffer()
: This operator buffers the emitted elements until the subscriber can consume them. It’s essential to use this operator with caution, as it can lead to increased memory usage if the buffer grows too large.import reactor.core.publisher.Flux; public class BackpressureDemo { public static void main(String[] args) { Flux.range(1, 10) .onBackpressureBuffer(5) // Buffer size is limited to 5 elements .subscribe( item -> System.out.println("Received: " + item), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
We create a
Flux
that emits a sequence of integers from 1 to 10 and applies theonBackpressureBuffer(5)
operator, which limits the buffer size to 5 elements. Thesubscribe()
method is used to consume the elements and print them.Since the buffer size is limited to 5 elements, the
Flux
will emit the first 5 elements (1 to 5) immediately and buffer the remaining elements (6 to 10) until there is space in the buffer for them.The output of the code will be as follows:
Received: 1 Received: 2 Received: 3 Received: 4 Received: 5 Received: 6 // Buffered element Received: 7 // Buffered element Received: 8 // Buffered element Received: 9 // Buffered element Received: 10 // Buffered element Flux completed.
onBackpressureDrop()
: This operator simply drops the elements that the subscriber cannot handle, helping to prevent buffer overflow.import reactor.core.publisher.Flux; public class BackpressureDemo { public static void main(String[] args) { Flux.range(1, 10) .onBackpressureDrop() .subscribe( item -> System.out.println("Received: " + item), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
In this code, we have a
Flux
that emits a range of integers from 1 to 10. We applied theonBackpressureDrop()
operator, which simply drops elements that the subscriber cannot handle due to backpressure. Since there is no explicit delay or resource-consuming operation in the subscriber, it can process elements at the same rate they are emitted, and no backpressure situation occurs.As a result, all the elements from 1 to 10 are received and printed, followed by the “Flux completed.” message, indicating that the
Flux
has completed its emission successfully.
Output:Received: 1 Received: 2 Received: 3 Received: 4 Received: 5 Received: 6 Received: 7 Received: 8 Received: 9 Received: 10 Flux completed.
onBackpressureLatest()
: This operator keeps the latest emitted element and drops all previous unprocessed elements.import reactor.core.publisher.Flux; public class BackpressureDemo { public static void main(String[] args) { Flux.range(1, 10) .onBackpressureLatest() .subscribe( item -> System.out.println("Received: " + item), error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
We use the
Flux.range()
method to create aFlux
that emits integers from 1 to 10. We then apply theonBackpressureLatest()
operator to handle backpressure by keeping the latest emitted element and dropping all previous unprocessed elements.However, in this specific case, since there is no backpressure scenario introduced (e.g., slow subscriber or blocking operation), the
onBackpressureLatest()
operator won’t have any practical effect on the output. TheFlux
will emit all elements from 1 to 10 without any dropping.The output of the code will be as follows:
Received: 1 Received: 2 Received: 3 Received: 4 Received: 5 Received: 6 Received: 7 Received: 8 Received: 9 Received: 10 Flux completed.
Each integer from 1 to 10 is emitted sequentially, and the
onBackpressureLatest()
operator doesn’t interfere because there is no backpressure situation simulated in this code.
Implementing Custom Strategies for Managing Backpressure
In addition to the built-in backpressure operators, Project Reactor allows you to implement custom backpressure strategies using the onBackpressure()
operator. You can define your own logic to handle backpressure according to your application’s requirements.
import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; public class CustomBackpressureDemo { public static void main(String[] args) { Flux.range(1, 10) .onBackpressure((signal) -> { // Custom backpressure logic System.out.println("Backpressure signal received: " + signal); // Implement your own strategy, like dropping, buffering, etc. }) .publishOn(Schedulers.newSingle("custom-thread")) // Simulate slow processing .subscribe( item -> { try { Thread.sleep(1000); // Simulate slow subscriber } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Received: " + item); }, error -> System.err.println("Error: " + error), () -> System.out.println("Flux completed.") ); } }
Output:
Received: 1 Backpressure signal received: BUFFER_OVERFLOW Received: 2 Backpressure signal received: BUFFER_OVERFLOW Received: 3 Backpressure signal received: BUFFER_OVERFLOW Received: 4 Backpressure signal received: BUFFER_OVERFLOW Received: 5 Backpressure signal received: BUFFER_OVERFLOW Received: 6 Backpressure signal received: BUFFER_OVERFLOW Received: 7 Backpressure signal received: BUFFER_OVERFLOW Received: 8 Backpressure signal received: BUFFER_OVERFLOW Received: 9 Backpressure signal received: BUFFER_OVERFLOW Received: 10 Backpressure signal received: BUFFER_OVERFLOW Flux completed.
Explanation:
- The code creates a
Flux
containing numbers from 1 to 10. - We apply a custom backpressure handling logic using the
onBackpressure()
operator. In this example, we don’t implement any specific strategy but print a message when the backpressure signal is received. - The
publishOn()
operator is used to switch the subscriber to a separate thread (“custom-thread”) to simulate slower processing. - The
subscribe()
method consumes the elements. Each element processing is slowed down with a 1-second delay to simulate a slow subscriber. - As the subscriber is slower than the publisher (emits at a faster rate), backpressure is triggered, and the custom backpressure logic is executed. In this case, the backpressure signal received is
BUFFER_OVERFLOW
. - The subscriber eventually processes all elements and completes the
Flux
.
Please note that the output might slightly vary based on system performance and scheduling, but the overall behavior with backpressure handling remains the same. The example showcases how Project Reactor helps manage backpressure efficiently, allowing you to implement custom strategies as per your application’s requirements.
Conclusion
In conclusion, this tutorial provided a comprehensive introduction to Project Reactor in Java, covering the essential concepts and functionalities. We explored setting up the environment, understanding Flux
and Mono
, applying operators for data transformation, and combining streams. We also discussed working with Mono
, handling success and error cases, and integrating it with Flux
.
The tutorial concluded by addressing the importance of backpressure management and provided insights into using built-in operators and implementing custom strategies. With these foundational skills, you are well-equipped to leverage Project Reactor for efficient and responsive Java development. Don’t forget to explore the Java Reactive page for additional captivating tutorials.