Basics
Observable
import { Observable } from "rxjs";
const source$ = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
const subscription = source$.subscribe({
next: (val) => console.log(val),
error: (err) => console.error(err),
complete: () => console.log("Done"),
});
subscription.unsubscribe();
Create observables with custom logic.
Pipe Pattern
import { of } from "rxjs";
import { map, filter, tap } from "rxjs/operators";
of(1, 2, 3, 4, 5)
.pipe(
filter((x) => x % 2 === 0), // Keep even
map((x) => x * 10), // Transform
tap((x) => console.log(x)), // Side effect
)
.subscribe();
// Output: 20, 40
Chain operators with .pipe().
Subjects
| Type | Initial Value | Replays | On Complete |
|---|---|---|---|
Subject |
❌ None | ❌ None | Emits nothing |
BehaviorSubject |
✅ Required | Last value (1) | Emits last |
ReplaySubject |
❌ None | Last N values | Emits buffered |
AsyncSubject |
❌ None | ❌ None | Emits ONLY last value |
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from "rxjs";
const subject = new Subject<number>();
const behavior = new BehaviorSubject<number>(0); // Requires initial value
const replay = new ReplaySubject<number>(3); // Replays last 3
const async = new AsyncSubject<number>(); // Only emits on complete
Subjects are both observable and observer.
Creation Operators
Common Creators
import {
of,
from,
fromEvent,
interval,
timer,
range,
defer,
EMPTY,
NEVER,
throwError,
} from "rxjs";
of(1, 2, 3); // Emit values
from([1, 2, 3]); // From array
from(fetch("/api")); // From promise
fromEvent(document, "click"); // From DOM event
interval(1000); // Every 1s: 0, 1, 2...
timer(2000); // After 2s: 0, complete
timer(2000, 1000); // After 2s, then every 1s
range(1, 5); // 1, 2, 3, 4, 5
Create observables from various sources.
Utility Creators
defer(() => from(fetch("/api"))); // Lazy — creates on subscribe
EMPTY; // Completes immediately
NEVER; // Never emits or completes
throwError(() => new Error("oops")); // Emits error
Special creators for edge cases.
ajax
import { ajax } from "rxjs/ajax";
ajax.getJSON("/api/users").subscribe((data) => console.log(data));
ajax({
url: "/api/users",
method: "POST",
headers: { "Content-Type": "application/json" },
body: { name: "John" },
}).subscribe((response) => console.log(response));
HTTP requests as observables.
Transformation Operators
map / scan
import { map, scan, reduce, pairwise, toArray } from "rxjs/operators";
// Transform each value
source$.pipe(map((x) => x * 2));
// Running accumulator (emits each step)
source$.pipe(scan((acc, val) => acc + val, 0));
// 1, 3, 6, 10...
// Final accumulator (emits once on complete)
source$.pipe(reduce((acc, val) => acc + val, 0));
// Previous + current pair
source$.pipe(pairwise());
// [1,2], [2,3], [3,4]...
// Collect all into array
source$.pipe(toArray());
Transform values in the stream.
Higher-Order Mapping
⭐ Most Important
import { switchMap, mergeMap, concatMap, exhaustMap } from "rxjs/operators";
// switchMap: Cancel previous inner (LATEST WINS)
search$.pipe(switchMap((term) => api.search(term)));
// Use for: typeahead, autocomplete, route changes
// mergeMap: Run all inner in parallel
clicks$.pipe(mergeMap(() => api.logClick()));
// Use for: independent parallel operations
// concatMap: Queue inner sequentially
saves$.pipe(concatMap((data) => api.save(data)));
// Use for: ordered operations (order matters)
// exhaustMap: Ignore while inner active
button$.pipe(exhaustMap(() => api.submit()));
// Use for: prevent duplicate submissions
Flatten higher-order observables.
Comparison
| Operator | Concurrent | Cancels Prev | Order | Best For |
|---|---|---|---|---|
switchMap |
1 at a time | ✅ Yes | Latest | Typeahead, latest request |
mergeMap |
Unlimited | ❌ No | Any | Parallel operations |
concatMap |
1 at a time | ❌ No (queues) | Sequential | Order-sensitive ops |
exhaustMap |
1 at a time | ❌ No (ignores) | First | Button click guards |
Choose the right mapping operator.
buffer / window
import { bufferTime, bufferCount } from "rxjs/operators";
source$.pipe(bufferTime(1000)); // Collect for 1s, emit array
source$.pipe(bufferCount(5)); // Collect 5, emit array
Collect emissions into arrays.
Filtering Operators
Time-Based Filters
import {
debounceTime,
throttleTime,
auditTime,
sampleTime,
distinctUntilChanged,
} from "rxjs/operators";
// Wait for silence (typeahead)
input$.pipe(debounceTime(300));
// Emit first, then once per interval
input$.pipe(throttleTime(300));
// Sample at interval
input$.pipe(auditTime(300));
// Only if value changed
input$.pipe(distinctUntilChanged());
// Custom comparison
input$.pipe(distinctUntilChanged((prev, curr) => prev.id === curr.id));
Control emission timing.
Value Filters
import {
filter,
take,
takeUntil,
takeWhile,
skip,
first,
last,
single,
elementAt,
} from "rxjs/operators";
source$.pipe(filter((x) => x > 5)); // Predicate filter
source$.pipe(take(3)); // First 3 then complete
source$.pipe(takeUntil(destroy$)); // Until notifier emits ⭐
source$.pipe(takeWhile((x) => x < 10)); // While predicate true
source$.pipe(skip(2)); // Skip first 2
source$.pipe(first()); // First value + complete
source$.pipe(last()); // Last value (on complete)
source$.pipe(elementAt(2)); // Value at index 2
Filter emissions by condition.
distinct
import {
distinct,
distinctUntilChanged,
distinctUntilKeyChanged,
} from "rxjs/operators";
source$.pipe(distinct()); // All-time unique
source$.pipe(distinctUntilChanged()); // Consecutive dupes
source$.pipe(distinctUntilKeyChanged("name")); // By property
Remove duplicate values.
Combination Operators
combineLatest
import { combineLatest, forkJoin, merge, concat, zip, race } from "rxjs";
import { withLatestFrom, startWith } from "rxjs/operators";
// Emit when ANY emits (needs all to emit once first)
combineLatest([obs1$, obs2$, obs3$]).subscribe(([val1, val2, val3]) =>
console.log(val1, val2, val3),
);
⚠️ Won't emit until ALL sources have emitted at least once. Use startWith() for initial values.
forkJoin
// Wait for ALL to complete, emit last values
forkJoin({
users: http.get("/users"),
posts: http.get("/posts"),
}).subscribe(({ users, posts }) => {
console.log(users, posts);
});
Like Promise.all for observables. ⚠️ Never emits if any inner observable doesn't complete!
merge / concat / zip / race
// merge: Emit all values as they arrive (interleaved)
merge(obs1$, obs2$).subscribe(val => ...);
// concat: Sequential (subscribe to next after previous completes)
concat(obs1$, obs2$).subscribe(val => ...);
// zip: Pair values by index
zip(obs1$, obs2$).subscribe(([a, b]) => ...);
// race: First to emit wins, others unsubscribed
race(obs1$, obs2$).subscribe(val => ...);
// withLatestFrom: Combine with latest from another
source$.pipe(
withLatestFrom(other$)
).subscribe(([sourceVal, otherVal]) => ...);
// startWith: Prepend initial value
source$.pipe(startWith(0)).subscribe(val => ...);
Combine multiple observables.
Error Handling
catchError
import {
catchError,
retry,
finalize,
throwIfEmpty,
timeout,
} from "rxjs/operators";
import { of, EMPTY } from "rxjs";
// Return fallback on error
source$.pipe(catchError((err) => of("fallback value")));
// Retry N times
source$.pipe(retry(3));
// Retry with config (v7+)
source$.pipe(retry({ count: 3, delay: 1000 }));
// Timeout
source$.pipe(
timeout(5000), // Error if no emit in 5s
);
// Cleanup (always runs)
source$.pipe(finalize(() => console.log("Done or Error")));
Handle errors gracefully.
Error Placement Matters
// ⚠️ Inside switchMap: stream continues after error
source$.pipe(
switchMap((val) =>
api.fetch(val).pipe(
catchError((err) => of(null)), // Fallback per request
),
),
);
// ⚠️ Outside switchMap: stream ends on error
source$.pipe(
switchMap((val) => api.fetch(val)),
catchError((err) => EMPTY), // Entire stream ends
);
Placement affects stream lifecycle.
Multicasting
share / shareReplay
import { share, shareReplay } from "rxjs/operators";
// Share execution among subscribers
const shared$ = source$.pipe(share());
// Share + replay last N values to late subscribers
const cached$ = source$.pipe(shareReplay(1));
// Common: cache HTTP response
const user$ = http
.get("/api/user")
.pipe(shareReplay({ bufferSize: 1, refCount: true }));
Convert cold observables to hot.
Cold vs Hot
Cold Observable: Hot Observable:
┌─ subscribe → starts execution ┌─ already running
│ Each subscriber gets own copy │ Subscribers share execution
│ Like watching a video on-demand│ Like live TV broadcast
└─ of(), from(), ajax() └─ Subject, fromEvent(), share()
Understand execution models.
Utility Operators
tap
import { tap, delay, observeOn, subscribeOn } from "rxjs/operators";
import { asyncScheduler } from "rxjs";
// Side effects (logging, debugging)
source$.pipe(
tap((val) => console.log("Before:", val)),
map((val) => val * 2),
tap((val) => console.log("After:", val)),
);
// Delay emissions
source$.pipe(delay(1000));
// Schedule execution
source$.pipe(observeOn(asyncScheduler));
Side effects without altering values.
firstValueFrom / lastValueFrom
import { firstValueFrom, lastValueFrom } from "rxjs";
// Convert observable to promise (replaces deprecated toPromise)
const value = await firstValueFrom(source$);
const last = await lastValueFrom(source$);
Convert observables to promises.
Custom Operators
import { pipe } from "rxjs";
import { filter, map } from "rxjs/operators";
// Create reusable operator
function multiplyEven(factor: number) {
return pipe(
filter((x: number) => x % 2 === 0),
map((x: number) => x * factor),
);
}
source$.pipe(multiplyEven(10)).subscribe();
Build reusable operators.
Common Patterns
Typeahead / Autocomplete
const search$ = fromEvent(searchBox, "keyup").pipe(
map((e: any) => e.target.value),
debounceTime(300),
distinctUntilChanged(),
switchMap((term) => (term.length > 2 ? api.search(term) : of([]))),
);
Search with debouncing.
Unsubscribe Pattern
private destroy$ = new Subject<void>();
ngOnInit() {
this.data$.pipe(
takeUntil(this.destroy$)
).subscribe(data => ...);
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
Angular component cleanup.
HTTP with Retry
api.fetch("/data").pipe(
retry({ count: 3, delay: 1000 }),
catchError((err) => of({ error: true })),
);
Resilient HTTP requests.
Polling
import { timer, switchMap } from "rxjs";
timer(0, 5000)
.pipe(
// Immediately, then every 5s
switchMap(() => api.fetch("/data")),
)
.subscribe();
Periodic data fetching.
State Management
class Store {
private state$ = new BehaviorSubject<State>(initialState);
getState() {
return this.state$.asObservable();
}
update(partial: Partial<State>) {
this.state$.next({ ...this.state$.value, ...partial });
}
}
BehaviorSubject for state.
Schedulers
| Scheduler | When | Use Case |
|---|---|---|
asyncScheduler |
setTimeout | Default async |
asapScheduler |
Microtask (Promise) | Before next paint |
animationFrameScheduler |
requestAnimationFrame | Animations |
queueScheduler |
Synchronous, recursive | Breadth-first recursion |
Control execution timing.
v7 Changes
| Change | Before | After |
|---|---|---|
toPromise |
source$.toPromise() |
firstValueFrom(source$) / lastValueFrom(source$) |
retryWhen |
retryWhen(errors => ...) |
retry({ count: 3, delay: 1000 }) |
pluck |
pluck('name') |
map(x => x.name) |
| Imports | import { map } from 'rxjs/operators' |
Also: import { map } from 'rxjs' (v7.2+) |
Breaking changes in RxJS v7.
Gotchas
Common Pitfalls
- combineLatest won't emit until ALL sources emit — use
startWith()for defaults - forkJoin never emits if inner observable doesn't complete — ensure all complete
- Memory leaks from unsubscribed observables — use
takeUntilpattern - Nested subscribes are anti-patterns — use higher-order mapping operators instead
More Pitfalls
- catchError placement matters — inside inner = stream continues; outside = stream ends
- Observables are cold by default — use
share()/shareReplay()for hot behavior - mergeMap for sequential ops is wrong — use
concatMapwhen order matters
Also see
- RxJS docs (rxjs.dev)
- LearnRxJS (learnrxjs.io)
- RxJS Marbles (rxmarbles.com)
- Angular University RxJS guide (angular-university.io)