void accept(Flux<String> input) {
input.doOnNext(message -> System.out.println("Received message: " + message))
.flatMap(message -> Mono.just(message)
.map(Integer::valueOf)
.doOnSuccess(x -> System.out.println("Processed successfully: " + message))
.onErrorMap(IllegalArgumentException::new)
.onErrorContinue((throwable, o) -> System.out.println("Error: " + throwable.getClass()))
.subscribe();
We expect errors thrown by, say Integer.valueOf()
, to be mapped to IllegalArgumentException
s and the stream comsumption to be continued.
Actual Behavior
Errors are not mapped to IllegalArgumentException
s and there is logged a message that some invalid input was processed successfully.
We fixed it by replacing map(Integer::valueOf)
with flatMap(x -> Mono.just(Integer.valueOf(x)))
, however, we feel it may be a workaround. Imagine that you have a code like this flatMap(repository::save)
where you have no control on the operators used by the underlying implementation.
While adding some extra logging we observed that in case of an error there was requested a new element (request(1)
) from Mono.just()
. Does it make sense? Is it expected or documented somewhere? It is worth mentioning that some unexpected behavior occurs while using other operators, e.g., switchIfEmpty()
(instead of doOnSuccess()
).
Steps to Reproduce
@Test
void reproCase() {
accept(Flux.fromStream(Stream.of("1", "2", "three", "4", "5")));
A full example may be found in my repo.
Your Environment
Reactor version used: reactor-core version 3.4.2
onErrorContinue
is an unsafe operator that breaks the Reactive Streams specification. It explicitly states that A) it influences operators upstream of it and B) not all such operators support it.
don't rely on it for generic chains, only use it in contexts where you can be 100% the impacted operators upstream of it support it. in general, favor composing other onError
handling operators (like onErrorResume(t -> Mono.empty())
, which might seem more involved but give more guarantee of applying since they work within the boundaries of the Reactive Streams spec).
We intend to make that even more prominent in a later version by deprecating the onErrorContinue
and doOnDiscard
operators in order to group them under an unsafe
sub-group.