When working with the Reactive Streams, if an error occurs while the Publisher is sending data, the entire flow will be interrupted, and the onError signal will be sent to the Consumer. No other signals will be sent after the onError.
Example
class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just(2, 7, 10) .concatWith(Flux.error(new RuntimeException("Exception occurred"))) .concatWith(Mono.just(12)) .log() .subscribe(); } }
How to handle exceptions in Project Reactor?
In the Project Reactor, we can handle exceptions using some of the following operators:
- onErrorReturn()
- onErrorResume()
- onErrorContinue()
- onErrorMap()
- doOnError()
The exception handling operators in Project Reactor are defined in both Mono and Flux classes.
Handling exceptions in Project Reactor with the onErrorReturn() operator
The onErrorReturn() will emit the specified callback value when the error occurs. In this way, the code will recover from the exception.
Example
class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just(2, 7, 10) .concatWith(Flux.error(new RuntimeException("Exception occurred"))) .concatWith(Mono.just(12)) .onErrorReturn(72) .log() .subscribe(); } }
Handling exceptions in Project Reactor with the onErrorResume() operator
The onErrorResume() method accepts the Function interface and produces a result, which is in this case a Mono.
Example
class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just(2, 7, 10) .concatWith(Flux.error(new RuntimeException("Exception occurred"))) .concatWith(Mono.just(12)) .onErrorResume(err -> { System.out.println("Error caught: " + err); return Mono.just(12); }) .log() .subscribe(); } }
Handle exceptions with the onErrorContinue() operator
The onErrorContinue() catches the exception, the element that caused the exception will be dropped, and the Publisher will continue emitting the remaining elements.
Example
class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just(2, 7, 10, 8, 12, 22, 24) .map(element -> { if (element == 8) { throw new RuntimeException("Exception occurred!"); } return element; }).onErrorContinue((ex, element) -> { System.out.println("Exception caught: " + ex); System.out.println("The element that caused the exception is: " + element); }).log() .subscribe(); } }
Handle exceptions with the onErrorMap() operator
With the onErrorMap(), the code can’t recover from the exception. This method only catches the exception and transforms it from one type to another.
Example
class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just(2, 7, 10, 8, 12, 22, 24) .map(element -> { if (element == 8) { throw new RuntimeException("Exception occurred!"); } return element; }).onErrorMap(ex -> { System.out.println("Exception caught: " + ex); return new CustomException(ex.getMessage(), ex); }).log() .subscribe(); } } class CustomException extends Exception { public CustomException(String message, Throwable exception) { super(message, exception); } }
Handle exceptions with the doOnError() operator
This operator is one of the doOn Callbacks in Project Reactor. It doesn’t change the original sequence. With this operator, we can catch the exception and perform some action when the onError signal gets sent from the Publisher, but the code is not able to recover, and the error still gets propagated to the caller.
Example
class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just(1, 2, 3) .concatWith(Flux.error(new RuntimeException("Exception occurred."))) .doOnError(ex -> System.out.println("Exception caught: " + ex)) // catch and print the exception .log() .subscribe(); } }