I am trying to understand behavior of concatMap() operator. I wrote some example code
import './style.css';import { concatMap, fromEvent, tap } from 'rxjs';import { first } from 'rxjs/operators';const button1 = document.getElementById('button1');const button2 = document.getElementById('button2');const button3 = document.getElementById('button3');const obs1 = fromEvent(button1, 'click');const obs2 = fromEvent(button2, 'click');const obs3 = fromEvent(button3, 'click');obs1.pipe(concatMap((obs1value) => {console.log('obs1:', obs1value);return obs2; // wait for this to complete before merging next value}),concatMap((obs2value) => {console.log('obs2', obs2value);return obs3; // wait for this to complete before merging next value})).subscribe((obs3value) => {console.log('obs3:', obs3value);});// button2.click() // nothing// button3.click() // nothingbutton1.click(); // obs1button2.click(); // obs2button3.click(); // obs3button1.click(); // nothing -???button2.click(); // nothing -???button3.click(); // obs3button3.click(); // obs3button3.click(); // obs3
Initially after button 1, button 2 and button 3 are clicked, I can see the print statements. However after that for button1 and button2, there are no print statements, it is observed only in case of button3. I see RxJS documentation says concatMap() would be "...waiting for each one to complete before merging the next". But since obs1, obs2 and obs3 have not completed yet, why do I see the print statement even once?
If I change the code slightly https://stackblitz.com/edit/rxjs-zggqxc, to complete observable as below
obs3.pipe(first());
Now licking button1, why is nothing is printed?
button1.click(); // obs1button2.click(); // obs2button3.click(); // obs3button1.click(); // nothing -???button2.click(); // obs2button3.click(); // obs3button3.click(); // nothingbutton3.click(); // nothingbutton2.click(); // obs2
Best Answer
When you call button1.click();
second time the next
notification is emitted but obs2
hasn't completed yet so the project function in concatMap()
isn't invoked. Only if obs2
completed then concatMap()
would pop the oldest notification stacked and passed it to its project function.
If you log next
notifications comming from obs1
you'll see the event being emitted.
...obs1.pipe(tap(console.log),concatMap((obs1value) => {console.log('obs1:', obs1value);return obs2; // wait for this to complete before merging next value}),...
Notice you are using concatMap
, with the emphasis on map
.Mapping will not just concatenate all observables but replace (map) items from the previous observable with a sequence of other items.
For each item from your "outer" observable which is actually mapped to another "inner" observable, your map callback (parameter of concatMap(...)
) is executed once when the mapping actually occurs.
When are those mappings performed? - concatMap
will take one item from the outer observable and then defers handling all other items from the outer observable until the inner observable (returned by the map callback) completes. The outer observable's items are indeed buffered until the inner observable completes.
Side note: In a synchronous scenario this is roughly comparable to SelectMany
from C# or flatMap
from Java Streams API.
Let me describe your first use scenario:
- Each item in
obs1
will be replaced by the sequence of items fromobs2
.- When this replacement is evaluated by pressing
button1
(you might have implemented it in a way which depends on the value you received from obs1!), the first log output becomes visible. - Further clicks are on
button1
will be ignored untilobs2
has completed.
- When this replacement is evaluated by pressing
- Each item of
obs2
will be replaced by the sequence of items fromobs3
.- Again, when the replacement gets evaluated by pressing
button2
, the second log output becomes visible. - Further clicks on
button2
will be ignored untilobs3
has completed.
- Again, when the replacement gets evaluated by pressing
Therefore, when pressing button1
, button2
and button3
you are essentially just subscribed to obs3
.As obs3
never finishes, you will indeed never see any other output.
In your second scenario the observable you map to obs2
is obs3.pipe(take(1))
which completes after one click on button3
.As soon as that happens, the pipeline will will continue to map items from obs2
. The next mapping happens as soon as one item in obs2
becomes available (which is immediately, if button2
has been clicked intermediately or later as soon as it gets clicked) and then one click to button3
is awaited again.