Java Reactive Programming — Flux Create vs Flux Generate
Overview:
As part of Reactive Programming series, In this article, Lets take a look at the difference between Create and Generate methods of Flux. We have been discussing about Reactive Programming concepts / Reactor library. If you have not read previous articles, please take a look at them first.
- Reactive Programming — A Simple Introduction
- Reactive Programming — Creating Sequences — Flux vs Mono
- Reactive Programming — Publisher Types — Cold Vs Hot
Both Create and Generate methods are used to programmatically generate sequences for Flux/Mono. But what is the difference between these two?
Flux Create:
The create method accepts a FluxSink<T> consumer. That is, you would be given an instance of the FluxSink using which you can keep on emitting O …. N elements to the downstream subscribers. Each subscriber would get an instance of FluxSink to emit elements (Cold subscribers).
Lets consider this example to create a sequence using Flux.create.
Now lets assume that we have 2 downstream subscribers.
If we run the above code, I get the below output.
going to emit - 0
going to emit - 1
going to emit - 2
going to emit - 3
going to emit - 4
going to emit - 0
going to emit - 1
going to emit - 2
going to emit - 3
going to emit - 4
First :: 0
Second:: 0
First :: 1
First :: 2
Second:: 1
First :: 3
First :: 4
Second:: 2
Second:: 3
Second:: 4
What we can understand from the above output is,
- Each observer gets its own FluxSink instance which is expected as we create a Cold publisher.
- Create method does not wait for the observer to process the elements. It emits the elements even before observers start processing the elements.
Now, the obvious question could be, What if the observer can not keep up? Create method accepts one more parameter which defines the Overflow strategy & what needs to be done. Default behavior is buffer.
We could pass any of these values. Check the API here.
We can also get the reference of the FluxSink instance and emit elements outside the create method as and when we need. it does not have to happen inside the create method.
Simple Implementation Of Consumer Of FluxSink :
Emitting Elements:
Output:
First :: 0
Second:: 0
First :: 1
First :: 2
Second:: 1
First :: 3
First :: 4
Second:: 2
Second:: 3
Second:: 4
Multi-Thread Asynchronous Emitting:
I can also use multiple threads to emit elements via my FluxSink to the downstream subscribers.
Runnable runnable = () -> {
IntStream.range(0, 5)
.forEach(fluxSinkConsumer::publishEvent);
};
for (int i = 0; i < 3; i++) {
new Thread(runnable).start();
}
Flux Generate:
The generate method is slightly different from the create method as shown below. Here it accepts a Consumer of SynchronousSink<T>. In the above create method, we were able to pass a consumer which could emit O …… N elements. But with generate method, we can pass a consumer which could emit only one element!!
Does it mean this flux can emit only one element at the max?
No. This generate method can also emit potentially infinite number of elements. But what we mean here is that the consumer block can emit only one element along with an optional complete or error call. That is, generate method keeps on emitting elements one-by-one based on the demand from the downstream by invoking the consumer. Consumer itself cannot have loop to emit elements. If observers are not interested in processing further elements, generate would not emit elements.
To understand this behavior, run these codes one by one.
Create Behavior:
- In the above Consumer<FluxSink<Integer>> we have a loop which keeps on emitting elements
- We could see only one ‘Flux create’ message
- We could see 100 ‘going to emit’ statements
- Then all the 100 ‘First consumed’ statements one by one
Generate Behavior:
- We could see 32 ‘Flux generate’ messages
- We could see 23 ‘First consumed’ messages
- Again a bunch of ‘Flux generate’ messages
- Again a bunch of ‘First consumed’ messages
What is going on here?? If this behavior is very confusing, basically this is what happens. Generate method emits elements based on the demand from the downstream. It generates 32 elements first and buffers it. As and when downstream starts processing elements and when the buffer size drops the below threshold, it emits few more elements. This process is repeated again and again. At one point, if the observer stops processing elements, Flux.generate will also stop emitting elements. So this generate method is aware of downstream observers processing speed.
Lets try this code. Here we try to emit 2 elements at a time.
The output is as shown here! It is illegal to emit more than one element using a SynchronousSink.
Flux generate
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: More than one call to onNext
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: More than one call to onNext
Caused by: java.lang.IllegalStateException: More than one call to onNext
As we can not emit more than one element, It does not make sense to get the reference of SynchronousSink as we did for FluxSink above to programmatically emit elements.
Generate method can also maintain state if required.
The above code produced below output.
Consumed ::A
Consumed ::B
Consumed ::C
Consumed ::D
Consumed ::E
Consumed ::F
Consumed ::G
Consumed ::H
Consumed ::I
Consumed ::J
Consumed ::K
Consumed ::L
Consumed ::M
Consumed ::N
Consumed ::O
Consumed ::P
Consumed ::Q
Consumed ::R
Consumed ::S
Consumed ::T
Consumed ::U
Consumed ::V
Consumed ::W
Consumed ::X
Consumed ::Y
Consumed ::Z
Summary — Create vs Generate:
These are the high level differences between these two methods. Hopefully you have a better idea now when to use what!
Happy learning 🙂