NexusCS

RxJS

JavaScript libraries
RxJS is a reactive programming library for composing asynchronous and event-based programs using observable sequences. This guide covers RxJS v7.x.
featured

Basics

Observable

import { Observable } from "rxjs";

const source$ = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

const subscription = source$.subscribe({
  next: (val) => console.log(val),
  error: (err) => console.error(err),
  complete: () => console.log("Done"),
});

subscription.unsubscribe();

Create observables with custom logic.

Pipe Pattern

import { of } from "rxjs";
import { map, filter, tap } from "rxjs/operators";

of(1, 2, 3, 4, 5)
  .pipe(
    filter((x) => x % 2 === 0), // Keep even
    map((x) => x * 10), // Transform
    tap((x) => console.log(x)), // Side effect
  )
  .subscribe();
// Output: 20, 40

Chain operators with .pipe().

Subjects

Type Initial Value Replays On Complete
Subject ❌ None ❌ None Emits nothing
BehaviorSubject ✅ Required Last value (1) Emits last
ReplaySubject ❌ None Last N values Emits buffered
AsyncSubject ❌ None ❌ None Emits ONLY last value
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from "rxjs";

const subject = new Subject<number>();
const behavior = new BehaviorSubject<number>(0); // Requires initial value
const replay = new ReplaySubject<number>(3); // Replays last 3
const async = new AsyncSubject<number>(); // Only emits on complete

Subjects are both observable and observer.

Creation Operators

Common Creators

import {
  of,
  from,
  fromEvent,
  interval,
  timer,
  range,
  defer,
  EMPTY,
  NEVER,
  throwError,
} from "rxjs";

of(1, 2, 3); // Emit values
from([1, 2, 3]); // From array
from(fetch("/api")); // From promise
fromEvent(document, "click"); // From DOM event
interval(1000); // Every 1s: 0, 1, 2...
timer(2000); // After 2s: 0, complete
timer(2000, 1000); // After 2s, then every 1s
range(1, 5); // 1, 2, 3, 4, 5

Create observables from various sources.

Utility Creators

defer(() => from(fetch("/api"))); // Lazy — creates on subscribe
EMPTY; // Completes immediately
NEVER; // Never emits or completes
throwError(() => new Error("oops")); // Emits error

Special creators for edge cases.

ajax

import { ajax } from "rxjs/ajax";

ajax.getJSON("/api/users").subscribe((data) => console.log(data));

ajax({
  url: "/api/users",
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: { name: "John" },
}).subscribe((response) => console.log(response));

HTTP requests as observables.

Transformation Operators

map / scan

import { map, scan, reduce, pairwise, toArray } from "rxjs/operators";

// Transform each value
source$.pipe(map((x) => x * 2));

// Running accumulator (emits each step)
source$.pipe(scan((acc, val) => acc + val, 0));
// 1, 3, 6, 10...

// Final accumulator (emits once on complete)
source$.pipe(reduce((acc, val) => acc + val, 0));

// Previous + current pair
source$.pipe(pairwise());
// [1,2], [2,3], [3,4]...

// Collect all into array
source$.pipe(toArray());

Transform values in the stream.

Higher-Order Mapping

⭐ Most Important

import { switchMap, mergeMap, concatMap, exhaustMap } from "rxjs/operators";

// switchMap: Cancel previous inner (LATEST WINS)
search$.pipe(switchMap((term) => api.search(term)));
// Use for: typeahead, autocomplete, route changes

// mergeMap: Run all inner in parallel
clicks$.pipe(mergeMap(() => api.logClick()));
// Use for: independent parallel operations

// concatMap: Queue inner sequentially
saves$.pipe(concatMap((data) => api.save(data)));
// Use for: ordered operations (order matters)

// exhaustMap: Ignore while inner active
button$.pipe(exhaustMap(() => api.submit()));
// Use for: prevent duplicate submissions

Flatten higher-order observables.

Comparison

Operator Concurrent Cancels Prev Order Best For
switchMap 1 at a time ✅ Yes Latest Typeahead, latest request
mergeMap Unlimited ❌ No Any Parallel operations
concatMap 1 at a time ❌ No (queues) Sequential Order-sensitive ops
exhaustMap 1 at a time ❌ No (ignores) First Button click guards

Choose the right mapping operator.

buffer / window

import { bufferTime, bufferCount } from "rxjs/operators";

source$.pipe(bufferTime(1000)); // Collect for 1s, emit array
source$.pipe(bufferCount(5)); // Collect 5, emit array

Collect emissions into arrays.

Filtering Operators

Time-Based Filters

import {
  debounceTime,
  throttleTime,
  auditTime,
  sampleTime,
  distinctUntilChanged,
} from "rxjs/operators";

// Wait for silence (typeahead)
input$.pipe(debounceTime(300));

// Emit first, then once per interval
input$.pipe(throttleTime(300));

// Sample at interval
input$.pipe(auditTime(300));

// Only if value changed
input$.pipe(distinctUntilChanged());

// Custom comparison
input$.pipe(distinctUntilChanged((prev, curr) => prev.id === curr.id));

Control emission timing.

Value Filters

import {
  filter,
  take,
  takeUntil,
  takeWhile,
  skip,
  first,
  last,
  single,
  elementAt,
} from "rxjs/operators";

source$.pipe(filter((x) => x > 5)); // Predicate filter
source$.pipe(take(3)); // First 3 then complete
source$.pipe(takeUntil(destroy$)); // Until notifier emits ⭐
source$.pipe(takeWhile((x) => x < 10)); // While predicate true
source$.pipe(skip(2)); // Skip first 2
source$.pipe(first()); // First value + complete
source$.pipe(last()); // Last value (on complete)
source$.pipe(elementAt(2)); // Value at index 2

Filter emissions by condition.

distinct

import {
  distinct,
  distinctUntilChanged,
  distinctUntilKeyChanged,
} from "rxjs/operators";

source$.pipe(distinct()); // All-time unique
source$.pipe(distinctUntilChanged()); // Consecutive dupes
source$.pipe(distinctUntilKeyChanged("name")); // By property

Remove duplicate values.

Combination Operators

combineLatest

import { combineLatest, forkJoin, merge, concat, zip, race } from "rxjs";
import { withLatestFrom, startWith } from "rxjs/operators";

// Emit when ANY emits (needs all to emit once first)
combineLatest([obs1$, obs2$, obs3$]).subscribe(([val1, val2, val3]) =>
  console.log(val1, val2, val3),
);

⚠️ Won't emit until ALL sources have emitted at least once. Use startWith() for initial values.

forkJoin

// Wait for ALL to complete, emit last values
forkJoin({
  users: http.get("/users"),
  posts: http.get("/posts"),
}).subscribe(({ users, posts }) => {
  console.log(users, posts);
});

Like Promise.all for observables. ⚠️ Never emits if any inner observable doesn't complete!

merge / concat / zip / race

// merge: Emit all values as they arrive (interleaved)
merge(obs1$, obs2$).subscribe(val => ...);

// concat: Sequential (subscribe to next after previous completes)
concat(obs1$, obs2$).subscribe(val => ...);

// zip: Pair values by index
zip(obs1$, obs2$).subscribe(([a, b]) => ...);

// race: First to emit wins, others unsubscribed
race(obs1$, obs2$).subscribe(val => ...);

// withLatestFrom: Combine with latest from another
source$.pipe(
  withLatestFrom(other$)
).subscribe(([sourceVal, otherVal]) => ...);

// startWith: Prepend initial value
source$.pipe(startWith(0)).subscribe(val => ...);

Combine multiple observables.

Error Handling

catchError

import {
  catchError,
  retry,
  finalize,
  throwIfEmpty,
  timeout,
} from "rxjs/operators";
import { of, EMPTY } from "rxjs";

// Return fallback on error
source$.pipe(catchError((err) => of("fallback value")));

// Retry N times
source$.pipe(retry(3));

// Retry with config (v7+)
source$.pipe(retry({ count: 3, delay: 1000 }));

// Timeout
source$.pipe(
  timeout(5000), // Error if no emit in 5s
);

// Cleanup (always runs)
source$.pipe(finalize(() => console.log("Done or Error")));

Handle errors gracefully.

Error Placement Matters

// ⚠️ Inside switchMap: stream continues after error
source$.pipe(
  switchMap((val) =>
    api.fetch(val).pipe(
      catchError((err) => of(null)), // Fallback per request
    ),
  ),
);

// ⚠️ Outside switchMap: stream ends on error
source$.pipe(
  switchMap((val) => api.fetch(val)),
  catchError((err) => EMPTY), // Entire stream ends
);

Placement affects stream lifecycle.

Multicasting

share / shareReplay

import { share, shareReplay } from "rxjs/operators";

// Share execution among subscribers
const shared$ = source$.pipe(share());

// Share + replay last N values to late subscribers
const cached$ = source$.pipe(shareReplay(1));

// Common: cache HTTP response
const user$ = http
  .get("/api/user")
  .pipe(shareReplay({ bufferSize: 1, refCount: true }));

Convert cold observables to hot.

Cold vs Hot

Cold Observable:                  Hot Observable:
┌─ subscribe → starts execution  ┌─ already running
│  Each subscriber gets own copy  │  Subscribers share execution
│  Like watching a video on-demand│  Like live TV broadcast
└─ of(), from(), ajax()          └─ Subject, fromEvent(), share()

Understand execution models.

Utility Operators

tap

import { tap, delay, observeOn, subscribeOn } from "rxjs/operators";
import { asyncScheduler } from "rxjs";

// Side effects (logging, debugging)
source$.pipe(
  tap((val) => console.log("Before:", val)),
  map((val) => val * 2),
  tap((val) => console.log("After:", val)),
);

// Delay emissions
source$.pipe(delay(1000));

// Schedule execution
source$.pipe(observeOn(asyncScheduler));

Side effects without altering values.

firstValueFrom / lastValueFrom

import { firstValueFrom, lastValueFrom } from "rxjs";

// Convert observable to promise (replaces deprecated toPromise)
const value = await firstValueFrom(source$);
const last = await lastValueFrom(source$);

Convert observables to promises.

Custom Operators

import { pipe } from "rxjs";
import { filter, map } from "rxjs/operators";

// Create reusable operator
function multiplyEven(factor: number) {
  return pipe(
    filter((x: number) => x % 2 === 0),
    map((x: number) => x * factor),
  );
}

source$.pipe(multiplyEven(10)).subscribe();

Build reusable operators.

Common Patterns

Typeahead / Autocomplete

const search$ = fromEvent(searchBox, "keyup").pipe(
  map((e: any) => e.target.value),
  debounceTime(300),
  distinctUntilChanged(),
  switchMap((term) => (term.length > 2 ? api.search(term) : of([]))),
);

Search with debouncing.

Unsubscribe Pattern

private destroy$ = new Subject<void>();

ngOnInit() {
  this.data$.pipe(
    takeUntil(this.destroy$)
  ).subscribe(data => ...);
}

ngOnDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

Angular component cleanup.

HTTP with Retry

api.fetch("/data").pipe(
  retry({ count: 3, delay: 1000 }),
  catchError((err) => of({ error: true })),
);

Resilient HTTP requests.

Polling

import { timer, switchMap } from "rxjs";

timer(0, 5000)
  .pipe(
    // Immediately, then every 5s
    switchMap(() => api.fetch("/data")),
  )
  .subscribe();

Periodic data fetching.

State Management

class Store {
  private state$ = new BehaviorSubject<State>(initialState);

  getState() {
    return this.state$.asObservable();
  }

  update(partial: Partial<State>) {
    this.state$.next({ ...this.state$.value, ...partial });
  }
}

BehaviorSubject for state.

Schedulers

Scheduler When Use Case
asyncScheduler setTimeout Default async
asapScheduler Microtask (Promise) Before next paint
animationFrameScheduler requestAnimationFrame Animations
queueScheduler Synchronous, recursive Breadth-first recursion

Control execution timing.

v7 Changes

Change Before After
toPromise source$.toPromise() firstValueFrom(source$) / lastValueFrom(source$)
retryWhen retryWhen(errors => ...) retry({ count: 3, delay: 1000 })
pluck pluck('name') map(x => x.name)
Imports import { map } from 'rxjs/operators' Also: import { map } from 'rxjs' (v7.2+)

Breaking changes in RxJS v7.

Gotchas

Common Pitfalls

  • combineLatest won't emit until ALL sources emit — use startWith() for defaults
  • forkJoin never emits if inner observable doesn't complete — ensure all complete
  • Memory leaks from unsubscribed observables — use takeUntil pattern
  • Nested subscribes are anti-patterns — use higher-order mapping operators instead

More Pitfalls

  • catchError placement matters — inside inner = stream continues; outside = stream ends
  • Observables are cold by default — use share() / shareReplay() for hot behavior
  • mergeMap for sequential ops is wrong — use concatMap when order matters

Also see