Java Reactive

Project Reactor is an open-source reactive library for building reactive applications in Java. It is part of the broader Reactive Streams initiative and provides an implementation of the Reactive Streams specification. Developed by Pivotal Software, Project Reactor has gained widespread adoption in the Java ecosystem due to its powerful abstractions and ease of use. At…

Read More Introduction to Project Reactor in Java

In the ever-evolving world of software development, responsiveness and efficiency are paramount. Reactive Programming provides a powerful paradigm that allows you to build applications that are highly responsive, scalable, and resilient. In this tutorial, we will delve into the world of Reactive Programming in Java, exploring its key concepts and how it can benefit your…

Read More Introduction to Reactive Streams in Java

Reactive Programming, a powerful paradigm in modern software development, revolutionizes the way we handle data and events. By embracing reactive principles, developers can build highly responsive and scalable applications. In this tutorial, we will explore the fundamentals of Reactive Programming, its benefits, common use cases, and delve into real-world examples to demonstrate its practical applications.…

Read More Introduction to Reactive Programming

<dependency> <groupid>io.projectreactor</groupid> <artifactid>reactor-test</artifactid> <version>3.1.0.RELEASE</version> <scope>test</scope> </dependency> class ReactiveJavaTutorialTest { @Test public void testFlux() { //Create a Flux Flux <Integer> fluxToTest = Flux.just(1, 2, 3, 4, 5); // Create a test StepVerifier.create(fluxToTest) .expectNext(1) .expectNext(2) .expectNext(3) .expectNext(4) .expectNext(5) .expectComplete() // we expect Flux to complete after sending the number 5 which is the last element .verify(); }…

Read More Testing with StepVerifier in Project Reactor

class ReactiveJavaTutorial { public static void main(String[] args) { Flux<String> cities = Flux.fromIterable( new ArrayList<>(Arrays.asList(“New York”, “London”, “Paris”, “Toronto”, “Rome”))); cities.log().subscribe(); } } Output: INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | request(unbounded) INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onNext(New York) INFO…

Read More Implementing Backpressure in Project Reactor

class ReactiveJavaTutorial { public static void main(String[] args) { Flux<String> cities = Flux.just(“New York”, “London”, “Paris”, “Amsterdam”) .map(String::toUpperCase) .filter(cityName -> cityName.length() <= 8) .map(cityName -> cityName.concat(” City”)) .log(); cities.subscribe(); } Output: INFO 14040 — [main] reactor.Flux.MapFuseable.1 : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) INFO 14040 — [main] reactor.Flux.MapFuseable.1 : | request(unbounded) INFO 14040 — [main] reactor.Flux.MapFuseable.1 : |…

Read More subscribeOn and publishOn operators in Project Reactor

class ReactiveJavaTutorial { public static void main(String[] args) { Flux<String> cities = Flux.just(“New York”, “London”, “Paris”, “Amsterdam”) .map(String::toUpperCase) .filter(cityName -> cityName.length() <= 8) .map(cityName -> cityName.concat(” City”)) .log(); cities.subscribe(); } } INFO 14040 — [main] reactor.Flux.MapFuseable.1 : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) INFO 14040 — [main] reactor.Flux.MapFuseable.1 : | request(unbounded) INFO 14040 — [main] reactor.Flux.MapFuseable.1 : |…

Read More Reactor Execution Model – Threading and Schedulers

class ReactiveJavaTutorial { public static void main(String[] args) throws InterruptedException { Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo) .delayElements(Duration.ofSeconds(2)); // each part will play for 2 seconds // First Subscriber netFlux.subscribe(part -> System.out.println(“Subscriber 1: ” + part)); // wait 5 seconds before next Subscriber joins Thread.sleep(5000); // Seconds Subscriber netFlux.subscribe(part -> System.out.println(“Subscriber 2: ” + part)); Thread.sleep(60000); }…

Read More Hot and Cold Publishers in Project Reactor

class ReactiveJavaTutorial { public static void main(String[] args) { Mono.just(“data1”) .concatWith(Flux.error(new RuntimeException(“Exception occurred.”))) .doOnError(ex -> System.out.println(“LOG: Exception caught: ” + ex)) .retry(3) //retry 3 times in case of an error .log() .subscribe(); } } Output: reactor.Flux.Retry.1 : onSubscribe(FluxRetry.RetrySubscriber) reactor.Flux.Retry.1 : request(unbounded) reactor.Flux.Retry.1 : onNext(data1) LOG: Exception caught: java.lang.RuntimeException: Exception occurred. reactor.Flux.Retry.1 : onNext(data1) LOG: Exception…

Read More RetryFailed Operation in Project Reactor

class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just(2, 7, 10) .concatWith(Flux.error(new RuntimeException(“Exception occurred”))) .concatWith(Mono.just(12)) .log() .subscribe(); } } Output: reactor.Flux.ConcatArray.1 : onSubscribe(FluxConcatArray.ConcatArraySubscriber) reactor.Flux.ConcatArray.1 : request(unbounded) reactor.Flux.ConcatArray.1 : onNext(2) reactor.Flux.ConcatArray.1 : onNext(7) reactor.Flux.ConcatArray.1 : onNext(3) reactor.Flux.ConcatArray.1 : onError(java.lang.RuntimeException: Exception occurred) reactor.Flux.ConcatArray.1 : java.lang.RuntimeException: Exception occurred at com.example.demo.DemoApplication.main(DemoApplication.java:14) ~[classes/:na]   You can see that element…

Read More Handling Exceptions in Project Reactor

class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just(“data1”, “data2”, “data3”) .doOnSubscribe(subscription -> System.out.println(“Subscription: ” + subscription)) .subscribe(); } } Output: Subscription: reactor.core.publisher.FluxArray$ArraySubscription@72bca894 doOnNext() method class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just(“data1”, “data2”, “data3”) .doOnNext(data -> System.out.println(“Data: ” + data)) .subscribe(); } } Output: Data: data1 Data: data2 Data: data3 doOnComplete()…

Read More doOn Callbacks in Project Reactor

class ReactiveJavaTutorial { public static void main(String[] args) { Flux<String> firstFlux = Flux.fromArray(new String[]{“a”, “b”, “c”}); Flux<String> secondFlux = Flux.fromArray(new String[]{“d”, “e”, “f”}); // combine two Flux Publishers Flux<String> combinedFlux = Flux.concat(firstFlux, secondFlux); // subscribe and print the elements of a combined Flux combinedFlux.subscribe(element -> System.out.print(element + ” “)); } } Output: a b c…

Read More Combine Flux and Mono Publishers

class ReactiveJavaTutorial { public static void main(String[] args) { Flux.fromArray(new String[]{“Tom”, “Melissa”, “Steve”, “Megan”}) .map(String::toUpperCase) .subscribe(System.out::println); } } Output: TOM MELISSA STEVE MEGAN class ReactiveJavaTutorial { public static void main(String[] args) { Flux.fromArray(new String[]{“Tom”, “Melissa”, “Steven”, “Megan”}) .filter(name -> name.length() > 5) .map(String::toUpperCase) .subscribe(System.out::println); } } Output: MELISSA STEVEN   Note: Reactive Streams are immutable.…

Read More Transform Flux and Mono Using Operators

Flux<String> cities = Flux.fromIterable( new ArrayList<>(Arrays.asList(“New York”, “London”, “Paris”, “Toronto”, “Rome”))); cities.subscribe(); class ReactiveJavaTutorial { public static void main(String[] args) { Flux<String> cities = Flux.fromIterable( new ArrayList<>(Arrays.asList(“New York”, “London”, “Paris”, “Toronto”, “Rome”))); cities.log().subscribe(); } } Output: INFO 6832 — [ main] reactor.Flux.Iterable.1 : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription) INFO 6832 — [ main] reactor.Flux.Iterable.1 : |…

Read More How Mono and Flux Work Internally?

class ReactiveJavaTutorial { public static void main(String[] args) { Mono<String> mono = Mono.just(“data”); Flux<String> fluxFromMono = Flux.from(mono); fluxFromMono.subscribe(System.out::println); } } Output: data class ReactiveJavaTutorial { public static void main(String[] args) { // one value Flux<String> flux1 = Flux.just(“data1”); Mono<String> monoFromFlux1 = flux1.next(); // get data from mono monoFromFlux1.subscribe(data -> System.out.println(“monoFromFlux1 data: ” + data)); //…

Read More Convert Mono to Flux and vice versa