By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement . We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expected Behavior

Behavior of exception handling when some internal method uses map() or flatMap() should be consistent.

We have a reactive Kafka consumer which, in a very simplified form, looks like the following:

void accept(Flux<String> input) {
  input.doOnNext(message -> System.out.println("Received message: " + message))
  .flatMap(message -> Mono.just(message)
    .map(Integer::valueOf)
    .doOnSuccess(x -> System.out.println("Processed successfully: " + message))
    .onErrorMap(IllegalArgumentException::new)
  .onErrorContinue((throwable, o) -> System.out.println("Error: " + throwable.getClass()))
  .subscribe();

We expect errors thrown by, say Integer.valueOf(), to be mapped to IllegalArgumentExceptions and the stream comsumption to be continued.

Actual Behavior

Errors are not mapped to IllegalArgumentExceptions and there is logged a message that some invalid input was processed successfully.

We fixed it by replacing map(Integer::valueOf) with flatMap(x -> Mono.just(Integer.valueOf(x))), however, we feel it may be a workaround. Imagine that you have a code like this flatMap(repository::save) where you have no control on the operators used by the underlying implementation.

While adding some extra logging we observed that in case of an error there was requested a new element (request(1)) from Mono.just(). Does it make sense? Is it expected or documented somewhere? It is worth mentioning that some unexpected behavior occurs while using other operators, e.g., switchIfEmpty() (instead of doOnSuccess()).

Steps to Reproduce

@Test
void reproCase() {
  accept(Flux.fromStream(Stream.of("1", "2", "three", "4", "5")));

A full example may be found in my repo.

Your Environment

  • Reactor version used: reactor-core version 3.4.2
  • onErrorContinue is an unsafe operator that breaks the Reactive Streams specification. It explicitly states that A) it influences operators upstream of it and B) not all such operators support it.

    don't rely on it for generic chains, only use it in contexts where you can be 100% the impacted operators upstream of it support it. in general, favor composing other onError handling operators (like onErrorResume(t -> Mono.empty()), which might seem more involved but give more guarantee of applying since they work within the boundaries of the Reactive Streams spec).

    We intend to make that even more prominent in a later version by deprecating the onErrorContinue and doOnDiscard operators in order to group them under an unsafe sub-group.