I am trying to understand behavior of concatMap() operator. I wrote some example code

import './style.css';import { concatMap, fromEvent, tap } from 'rxjs';import { first } from 'rxjs/operators';const button1 = document.getElementById('button1');const button2 = document.getElementById('button2');const button3 = document.getElementById('button3');const obs1 = fromEvent(button1, 'click');const obs2 = fromEvent(button2, 'click');const obs3 = fromEvent(button3, 'click');obs1.pipe(concatMap((obs1value) => {console.log('obs1:', obs1value);return obs2; // wait for this to complete before merging next value}),concatMap((obs2value) => {console.log('obs2', obs2value);return obs3; // wait for this to complete before merging next value})).subscribe((obs3value) => {console.log('obs3:', obs3value);});// button2.click() // nothing// button3.click() // nothingbutton1.click(); // obs1button2.click(); // obs2button3.click(); // obs3button1.click(); // nothing -???button2.click(); // nothing -???button3.click(); // obs3button3.click(); // obs3button3.click(); // obs3

Initially after button 1, button 2 and button 3 are clicked, I can see the print statements. However after that for button1 and button2, there are no print statements, it is observed only in case of button3. I see RxJS documentation says concatMap() would be "...waiting for each one to complete before merging the next". But since obs1, obs2 and obs3 have not completed yet, why do I see the print statement even once?

If I change the code slightly https://stackblitz.com/edit/rxjs-zggqxc, to complete observable as below

obs3.pipe(first());

Now licking button1, why is nothing is printed?

button1.click(); // obs1button2.click(); // obs2button3.click(); // obs3button1.click(); // nothing -???button2.click(); // obs2button3.click(); // obs3button3.click(); // nothingbutton3.click(); // nothingbutton2.click(); // obs2
2

Best Answer


When you call button1.click(); second time the next notification is emitted but obs2 hasn't completed yet so the project function in concatMap() isn't invoked. Only if obs2 completed then concatMap() would pop the oldest notification stacked and passed it to its project function.

If you log next notifications comming from obs1 you'll see the event being emitted.

...obs1.pipe(tap(console.log),concatMap((obs1value) => {console.log('obs1:', obs1value);return obs2; // wait for this to complete before merging next value}),...

Notice you are using concatMap, with the emphasis on map.Mapping will not just concatenate all observables but replace (map) items from the previous observable with a sequence of other items.

For each item from your "outer" observable which is actually mapped to another "inner" observable, your map callback (parameter of concatMap(...)) is executed once when the mapping actually occurs.

When are those mappings performed? - concatMap will take one item from the outer observable and then defers handling all other items from the outer observable until the inner observable (returned by the map callback) completes. The outer observable's items are indeed buffered until the inner observable completes.

Side note: In a synchronous scenario this is roughly comparable to SelectMany from C# or flatMap from Java Streams API.

Let me describe your first use scenario:

  1. Each item in obs1 will be replaced by the sequence of items from obs2.
    • When this replacement is evaluated by pressing button1 (you might have implemented it in a way which depends on the value you received from obs1!), the first log output becomes visible.
    • Further clicks are on button1 will be ignored until obs2 has completed.
  2. Each item of obs2 will be replaced by the sequence of items from obs3.
    • Again, when the replacement gets evaluated by pressing button2, the second log output becomes visible.
    • Further clicks on button2 will be ignored until obs3 has completed.

Therefore, when pressing button1, button2 and button3 you are essentially just subscribed to obs3.As obs3 never finishes, you will indeed never see any other output.

In your second scenario the observable you map to obs2 is obs3.pipe(take(1)) which completes after one click on button3.As soon as that happens, the pipeline will will continue to map items from obs2. The next mapping happens as soon as one item in obs2 becomes available (which is immediately, if button2 has been clicked intermediately or later as soon as it gets clicked) and then one click to button3 is awaited again.