RxJS lossy and lossless pause/resume of the Observable

How to pause and resume an Observable RxJS stream

RxJS lossy and lossless pause/resume of the Observable

This article is largely based on the post from @kddsky:

Pausable Observables in RxJS
and other backpressure techniques

As the article states there are multiple ways on how to pause and resume an Observable RxJS stream. This post explores only 2 methods:

  1. Lossy pause/resume a.k.a Pausable operator in RxJS 4
  2. Lossless pause/resume a.k.a. Pausable buffered operator in RxJS 4
Expected End Result

TLDR: skip to the end of this page for source code on Stackblitz

RxJS 6 Lossy pause/resume implementation (pausable)

Source: https://thinkrx.io/gist/bf931768bbbdd4d895c41973199d4549/

Example usage: Imagine we have a stream of toaster messages (non-modal dialog that appears and disappears). In case of a lot of messages coming in in short period of time this would overwhelm user with the messages. Instead, we can pause/resume the message stream and reduce the number of those messages.

import { NEVER, Observable, Subject } from 'rxjs';
import {
  switchMap,
} from 'rxjs/operators';

...

// lossy implementation of pausing
testSubject: Subject<string> = new Subject();
pauser: Subject<boolean> = new Subject();
pausable: Observable<string>;

constructor() {
     
    this.pausable = this.pauser.pipe(
      switchMap((paused) => {
        return paused ? NEVER : this.testSubject;
      })
    );

    // lossy
    this.pausable.subscribe((v) => {
      console.log(v);
      this.lossyValues.push(v);
    });

    // initial pauser value
    this.pauser.next(false);
 }

SwitchMap has a cancelling effect.

Official explanation: In each emission the previous inner observable (the result of the function you supplied) is cancelled and the new observable is subscribed. You can remember this by the phrase switch to a new observable.

Here switchMap is used to switch between NEVER(A simple Observable that emits neither values nor errors nor the completion notification) and our Observable testSubject.  

Official advice

RxJS 6 Lossless pause/resume implementation (pausableBuffered)

Source: https://thinkrx.io/gist/cef1572743cbf3f46105ec2ba56228cd/

Example usage: Imagine uploading multiple files and internet connection cuts out. We can leverage Buffered Lossless pause/resume to continue the upload after the internet connection comes back. We don't want to miss out on files while the connection was down.

import { bufferToggle, merge, NEVER, Observable, Subject } from 'rxjs';
import {
  distinctUntilChanged,
  filter,
  mergeMap,
  share,
  switchMap,
  windowToggle,
} from 'rxjs/operators';

pauseLosslessSubj$: Subject<boolean> = new Subject();
pauseLossless$: Observable<boolean> = this.pauseLosslessSubj$.pipe(
    distinctUntilChanged(),
    share()
);
pauseLosslessOn$: Observable<boolean> = this.pauseLosslessSubj$.pipe(
    filter((v) => !v)
);
pauseLosslessOff$: Observable<boolean> = this.pauseLosslessSubj$.pipe(
    filter((v) => !!v)
);
testSubjectLossess: Subject<string> = new Subject();

...
constructor() {
    const result: Observable<string> = merge(
      this.testSubjectLossess.pipe(
        bufferToggle(this.pauseLosslessOff$, () => this.pauseLosslessOn$),
        mergeMap((x) => x)
      ),
      this.testSubjectLossess.pipe(
        windowToggle(this.pauseLosslessOn$, () => this.pauseLosslessOff$),
        mergeMap((x) => x)
      )
    );

    // subscribe to values
    result.subscribe((v) => {
      console.log(v);
    });
    // init lossless pauser values
    this.pauseLosslessSubj$.next(true);
    this.pauseLosslessSubj$.next(false);
}

When paused we buffer the values on the stream and emit then once the buffer is resumed.

We merge the windowToggle between resume and pause and bufferToggle between pause and resume.

Demo:
https://angular-ivy-x6t8xh.stackblitz.io

Source code:
https://stackblitz.com/edit/angular-ivy-x6t8xh?embed=1&file=src/app/app.component.ts