Java Reactive Programming — Flux vs Mono

Overview:

Publisher:

  • Stream is a pipeline of computational operations through which 0 . . . . N elements are conveyed from a data source to produce desired result.
  • Optional is same like stream. But it is 0 or 1 element.

Stream Pipeline:

//simple list
List<Integer> intList = Arrays.asList(1,2,3,4,5);
//creating stream from list
Stream<Integer> intStream = intList.stream()
.filter(i -> i > 2 )
.map(i -> i * i);
List<Integer> list1 = intStream
.collect(Collectors.toList());
//print
System.out.println(list1);

//output
[9, 16, 25]
List<Integer> list2 = intStream
.collect(Collectors.toList());
  • A stream pipeline can have N number of intermediate operations. But it can have only one terminal operator. That’s why the second collect did not work and throws the exception. Because the elements have been already collected from the stream into a list and the stream is closed.
  • Stream pipeline is synchronous.

Flux:

Flux.empty()
.subscribe(i -> System.out.println("Received : " + i));

//No output
  • subscribe method accepts a Consumer<T> where we define what we do with the emitted element.
Flux.just(1)
.subscribe(i -> System.out.println("Received : " + i));

//Output
Received : 1
  • 1 observer might be collecting all elements into a list while other observer could be logging the element details.
Flux<Integer> flux = Flux.just(1);//Observer 1
flux.subscribe(i -> System.out.println("Observer-1 : " + i));
//Observer 2
flux.subscribe(i -> System.out.println("Observer-2 : " + i));

//Output
Observer-1 : 1
Observer-2 : 1
  • just with arbitrary elements
Flux.just('a', 'b', 'c')
.subscribe(i -> System.out.println("Received : " + i));

//Output
Received : a
Received : b
Received : c
  • The below output shows that the entire pipeline is executed asynchronously by default.
System.out.println("Starts");

//flux emits one element per second
Flux<Character> flux = Flux.just('a', 'b', 'c', 'd')
.delayElements(Duration.ofSeconds(1));
//Observer 1 - takes 500ms to process
flux
.map(Character::toUpperCase)
.subscribe(i -> {
sleep(500);
System.out.println("Observer-1 : " + i);
});
//Observer 2 - process immediately
flux.subscribe(i -> System.out.println("Observer-2 : " + i));

System.out.println("Ends");

//Just to block the execution - otherwise the program will end only with start and end messages
//Output
Starts
Ends
Observer-2 : a
Observer-1 : A
Observer-2 : b
Observer-1 : B
Observer-2 : c
Observer-2 : d
Observer-1 : C
Observer-1 : D
  • We have 2 observers subscribed to the source. This is why we have onSubscribe method
  • request(32) — here 32 is the default buffer size. Observer requests for 32 elements to buffer/emit.
  • elements are emitted one-by-one.
  • Once all the elements are emitted. complete call is invoked to inform the observers not to expect any more elements.
Flux<Character> flux = Flux.just('a', 'b', 'c', 'd')
.log()
.delayElements(Duration.ofSeconds(1));
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(32)
[ INFO] (main) | onNext(a)
[ INFO] (main) | onNext(b)
[ INFO] (main) | onNext(c)
[ INFO] (main) | onNext(d)
[ INFO] (main) | onComplete()
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(32)
[ INFO] (main) | onNext(a)
[ INFO] (main) | onNext(b)
[ INFO] (main) | onNext(c)
[ INFO] (main) | onNext(d)
[ INFO] (main) | onComplete()
  • The subscribe method could accept other parameters as well to handle the error and completion calls. So far we have been consuming the elements received via the pipeline. But we could also get some unhandled exception. We can pass the handlers as shown here.
subscribe(
i -> System.out.println("Received :: " + i),
err -> System.out.println("Error :: " + err),
() -> System.out.println("Successfully completed"))
  • Lets take this example. We get the below output as expected. Here we simply divide 10 by each element.
Flux.just(1,2,3)
.map(i -> 10 / i)
.subscribe(
i -> System.out.println("Received :: " + i),
err -> System.out.println("Error :: " + err),
() -> System.out.println("Successfully completed"));

//Output
Received :: 10
Received :: 5
Received :: 3
Successfully completed
  • Now if we slightly modify our map operation as shown here — we would be doing division by zero which will throw RunTimeException which is handled so well here without the ugly try/catch block.
Flux.just(1,2,3)
.map(i -> i / (i-2))
.subscribe(
i -> System.out.println("Received :: " + i),
err -> System.out.println("Error :: " + err),
() -> System.out.println("Successfully completed"));

//Output
Received :: -1
Error :: java.lang.ArithmeticException: / by zero
  • fromArray — when you have array. just should also work here.
String[] arr = {"Hi", "Hello", "How are you"};

Flux.fromArray(arr)
.filter(s -> s.length() > 2)
.subscribe(i -> System.out.println("Received : " + i));

//Output
Received : Hello
Received : How are you
  • fromIterable — When you have collection of elements and like to pass them via Flux pipeline.
List<String> list = Arrays.asList("vins", "guru");
Flux<String> stringFlux = Flux.fromIterable(list)
.map(String::toUpperCase);
  • fromStream — If you have stream of elements.
List<String> list = Arrays.asList("vins", "guru");
Flux<String> stringFlux = Flux.fromStream(list.stream())
.map(String::toUpperCase);
  • Be careful with Streams!! Flux can have more than 1 observer. But below code will throw error saying that the stream has been closed.
//observer-1
stringFlux
.map(String::length)
.subscribe(i -> System.out.println("Observer-1 :: " + i));
//observer-2
stringFlux
.subscribe(i -> System.out.println("Observer-2 :: " + i));
  • The above problem can be fixed by using Supplier<Stream>
Flux.fromStream(() -> list.stream())
.map(String::toUpperCase);
  • range
//To provide a range of numbers
Flux.range(3, 5)
  • Flux.create
  • Flux.generate

Mono:

  • just — to emit one single item
Mono.just(1)
.subscribe(System.out::println);
  • Both Flux and Mono extends the Publisher<T> interface.
Publisher<Integer> publisher1 = Mono.just(1);
Publisher<Integer> publisher2 = Flux.just(1,2,3);
  • Using Callable/Supplier
Mono.fromCallable(() -> 1);
Mono.fromSupplier(() -> "a");
  • fromRunnable — We know that runnable does not accept any parameter and does not return anything either. So what do you think the below code will do?
Mono.fromRunnable(() -> System.out.println("Hello"))
.subscribe(i -> System.out.println("Received :: " + i));
  • The above code would just print “Hello” and nothing else will happen as it is because there is no item to emit. But if we add the error and complete handler, we get the below output. It is helpful if we need to be notified when a runnable is completed.
Mono.fromRunnable(() -> System.out.println("Hello"))
.subscribe(
i -> System.out.println("Received :: " + i),
err -> System.out.println("Error :: " + err),
() -> System.out.println("Successfully completed"));

//Output
Hello
Successfully completed

Summary:

--

--

Principal Software Engineer — passionate about software architectural design, microservices.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Vinoth Selvaraj

Principal Software Engineer — passionate about software architectural design, microservices.