Search

RxJS Cho Angular Dev: Operators Cần Biết Và Anti-Patterns

RxJS Cho Angular Dev: Operators Cần Biết Và Anti-Patterns

RxJS là phần khó nuốt nhất khi học Angular. Component, template, routing — hiểu trong vài ngày. Nhưng RxJS? Mình mất cả tháng mới thực sự hiểu tại sao switchMap khác mergeMap, tại sao subscribe trong subscribe là sai, và tại sao component unmount rồi mà HTTP request vẫn chạy.

Vấn đề là hầu hết tutorial dạy RxJS bằng marble diagram và ví dụ trừu tượng — interval, timer, fromEvent. Nhưng Angular dev cần RxJS cho những thứ cụ thể: gọi API, handle form input, combine nhiều data source, cleanup khi component destroy. Bài viết này focus vào đúng những thứ đó — operators thực sự dùng hàng ngày trong Angular project, và anti-patterns mình từng mắc (và thấy đồng nghiệp mắc).


Observable và Subscription — Hiểu đúng trước khi dùng

Observable là lazy — nó không chạy gì cho đến khi có người subscribe. Khác với Promise (chạy ngay khi tạo):

// Promise — chạy ngay
const promise = fetch('/api/invoices'); // HTTP request ĐÃ gửi

// Observable — chưa chạy gì
const obs$ = this.http.get('/api/invoices'); // Chưa gửi request
obs$.subscribe(); // BÂY GIỜ mới gửi

Mỗi lần subscribe tạo execution mới. Subscribe 3 lần = 3 HTTP requests:

const invoices$ = this.http.get('/api/invoices');
invoices$.subscribe(); // Request 1
invoices$.subscribe(); // Request 2
invoices$.subscribe(); // Request 3

Đây là nguồn gốc của nhiều bug: "Tại sao API bị gọi 2 lần?" — vì subscribe 2 lần (một lần trong component, một lần trong template bằng async pipe).

Convention: suffix $ cho Observable variables (invoices$, user$, loading$). Giúp nhận biết ngay biến nào là Observable cần subscribe/unsubscribe.


Operators mà Angular dev dùng hàng ngày

switchMap — Operator quan trọng nhất

switchMap cancel subscription cũ khi có giá trị mới. Use case phổ biến nhất: search-as-you-type.

// Search component
this.searchControl.valueChanges.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(term => this.invoiceService.search(term))
).subscribe(results => {
  this.searchResults = results;
});

User gõ "Mic" → API call cho "Mic". User gõ thêm "h" thành "Mich" → switchMap CANCEL request "Mic" (chưa trả về), tạo request mới cho "Mich". User gõ thêm "elin" → cancel "Mich", request "Michelin".

Tại sao cancel? Vì kết quả "Mic" không còn relevant khi user đã gõ "Mich". Nếu dùng mergeMap, cả 3 requests chạy song song, response nào về trước hiển thị trước — có thể "Mic" về sau "Mich" → UI hiển thị kết quả cũ. Đó là race condition.

switchMap dùng cho: search/autocomplete, route parameter thay đổi (load data theo route), dropdown selection trigger API call — bất kỳ lúc nào giá trị mới làm giá trị cũ trở nên vô nghĩa.

mergeMap (flatMap) — Khi cần tất cả kết quả

mergeMap KHÔNG cancel — tất cả inner Observables chạy song song:

// Upload nhiều files — cần TẤT CẢ upload hoàn thành
this.selectedFiles$.pipe(
  mergeMap(file => this.uploadService.upload(file))
).subscribe(response => {
  this.uploadedFiles.push(response);
});

Upload file A, B, C — cả 3 chạy song song, không cancel cái nào. Dùng mergeMap khi mọi execution đều quan trọng và không nên bị cancel.

concatMap — Tuần tự, từng cái một

concatMap đợi inner Observable hoàn thành rồi mới chạy cái tiếp:

// Tạo invoices tuần tự — đảm bảo invoice number tăng dần
this.invoicesToCreate$.pipe(
  concatMap(data => this.invoiceService.create(data))
).subscribe(invoice => {
  console.log(`Created: ${invoice.invoiceNumber}`);
  // INV-001, INV-002, INV-003 — đúng thứ tự
});

Dùng khi thứ tự quan trọng: auto-increment ID, sequential operations, khi operation sau phụ thuộc vào kết quả operation trước.

exhaustMap — Bỏ qua trong khi đang xử lý

exhaustMap bỏ qua giá trị mới nếu inner Observable chưa hoàn thành:

// Submit form — ngăn double submit
this.submitButton$.pipe(
  exhaustMap(() => this.invoiceService.create(this.form.value))
).subscribe(result => {
  this.router.navigate(['/invoices', result.id]);
});

User click Submit → API call bắt đầu. User click Submit lần 2 (impatient) → exhaustMap BỎ QUA click thứ 2 vì request đầu chưa xong. Không cần disable button hay loading flag — exhaustMap xử lý ở stream level.

Tóm tắt bốn map operators

switchMap cancel cũ, dùng mới — cho search, route change. mergeMap chạy song song tất cả — cho batch operations. concatMap chạy tuần tự — cho sequential operations. exhaustMap bỏ qua khi busy — cho form submit, prevent double click.


Combination operators

combineLatest — Khi cần nhiều data source

// Dashboard cần cả invoices và customers
invoices$ = this.invoiceService.getAll();
customers$ = this.customerService.getAll();
selectedStatus$ = this.statusFilter.valueChanges;

vm$ = combineLatest([
  this.invoices$,
  this.customers$,
  this.selectedStatus$
]).pipe(
  map(([invoices, customers, status]) => ({
    invoices: status
      ? invoices.filter(i => i.status === status)
      : invoices,
    customers,
    totalAmount: invoices
      .filter(i => !status || i.status === status)
      .reduce((sum, i) => sum + i.totalAmount, 0)
  }))
);
<!-- Template dùng async pipe -->
<div *ngIf="vm$ | async as vm">
  <app-invoice-table [invoices]="vm.invoices" />
  <app-stats [total]="vm.totalAmount" />
</div>

combineLatest emit khi BẤT KỲ source nào emit — nhưng chỉ sau khi TẤT CẢ sources đã emit ít nhất 1 lần. Dùng cho: dashboard cần nhiều data, filter + data combination, form dependent trên nhiều inputs.

forkJoin — Đợi tất cả hoàn thành

// Load initial data — cần TẤT CẢ trước khi render
forkJoin({
  invoices: this.invoiceService.getAll(),
  customers: this.customerService.getAll(),
  products: this.productService.getAll()
}).subscribe(({ invoices, customers, products }) => {
  this.invoices = invoices;
  this.customers = customers;
  this.products = products;
  this.loading = false;
});

forkJoin giống Promise.all — đợi tất cả Observable hoàn thành, emit một lần duy nhất với tất cả kết quả. Khác combineLatest: forkJoin chỉ emit cuối cùng, combineLatest emit mỗi khi có thay đổi.

Dùng cho: initial data loading, parallel API calls cần tất cả kết quả.

Cẩn thận: forkJoin cần tất cả Observable COMPLETE. Nếu một Observable không bao giờ complete (ví dụ WebSocket stream), forkJoin không bao giờ emit. Chỉ dùng với Observable có end (HTTP requests, take(1)).


Utility operators

tap — Side effect mà không đổi data

this.invoiceService.getAll().pipe(
  tap(invoices => console.log('Loaded', invoices.length, 'invoices')),
  tap(() => this.loading = false),
  tap(invoices => this.analytics.track('invoices_loaded', {
    count: invoices.length
  }))
).subscribe(invoices => {
  this.invoices = invoices;
});

tap chạy side effect (logging, set flag, analytics) mà không thay đổi data flowing qua stream. Debug tool cực kỳ hữu ích — thêm tap(x => console.log(x)) vào bất kỳ đâu trong pipe để xem data tại điểm đó.

catchError — Handle lỗi trong stream

this.invoiceService.getAll().pipe(
  catchError(error => {
    console.error('Failed to load invoices', error);
    this.notificationService.showError('Không thể tải danh sách hóa đơn');
    return of([]); // Trả về empty array thay vì break stream
  })
).subscribe(invoices => {
  this.invoices = invoices; // [] nếu lỗi
});

catchError bắt error và trả về Observable thay thế. Nếu không có catchError, error sẽ terminate stream — subscriber nhận error, stream chết, không emit gì nữa.

Quan trọng: trong switchMap / mergeMap, đặt catchError bên trong inner Observable:

// ĐÚNG — error chỉ kill inner Observable, outer stream vẫn sống
this.searchControl.valueChanges.pipe(
  switchMap(term =>
    this.invoiceService.search(term).pipe(
      catchError(() => of([]))  // catchError TRONG switchMap
    )
  )
).subscribe(results => this.results = results);

// SAI — error kill toàn bộ stream, search không hoạt động nữa
this.searchControl.valueChanges.pipe(
  switchMap(term => this.invoiceService.search(term)),
  catchError(() => of([]))  // catchError NGOÀI switchMap
).subscribe(results => this.results = results);
// Sau lần lỗi đầu tiên, gõ gì cũng không search nữa

startWith — Giá trị khởi tạo

// Filter form — cần giá trị ban đầu để combineLatest emit
selectedStatus$ = this.statusFilter.valueChanges.pipe(
  startWith('all')  // Emit 'all' ngay lập tức, không đợi user chọn
);

startWith quan trọng khi dùng với combineLatest — vì combineLatest chỉ emit khi TẤT CẢ sources đã emit. Nếu filter chưa emit (user chưa chọn gì), dashboard không hiển thị gì. startWith cho giá trị mặc định.

takeUntilDestroyed — Unsubscribe tự động (Angular 16+)

@Component({...})
export class InvoiceListComponent {
  private destroyRef = inject(DestroyRef);

  ngOnInit() {
    this.invoiceService.getAll().pipe(
      takeUntilDestroyed(this.destroyRef)
    ).subscribe(invoices => {
      this.invoices = invoices;
    });
  }
}

Trước Angular 16, phải dùng Subject + takeUntil pattern thủ công. takeUntilDestroyed clean hơn — tự unsubscribe khi component destroy. Nếu subscribe mà không unsubscribe — memory leak, HTTP requests tiếp tục chạy sau khi navigate away.


Anti-Patterns — Những sai lầm phổ biến

1. Subscribe trong subscribe (nested subscribe)

// SAI — callback hell version RxJS
this.route.params.subscribe(params => {
  this.invoiceService.getById(params['id']).subscribe(invoice => {
    this.customerService.getById(invoice.customerId).subscribe(customer => {
      this.invoice = invoice;
      this.customer = customer;
    });
  });
});

// ĐÚNG — dùng operators
this.route.params.pipe(
  switchMap(params => this.invoiceService.getById(params['id'])),
  switchMap(invoice =>
    this.customerService.getById(invoice.customerId).pipe(
      map(customer => ({ invoice, customer }))
    )
  ),
  takeUntilDestroyed(this.destroyRef)
).subscribe(({ invoice, customer }) => {
  this.invoice = invoice;
  this.customer = customer;
});

Nested subscribe: không thể cancel, không thể retry, không thể combine với operators khác, memory leak vì inner subscribe không tự cleanup.

2. Quên unsubscribe

// SAI — memory leak
ngOnInit() {
  this.someService.getData().subscribe(data => {
    this.data = data;
  });
  // Component destroy → subscription vẫn sống → memory leak
}

// ĐÚNG — cách 1: async pipe (tốt nhất, Angular tự unsubscribe)
data$ = this.someService.getData();
// Template: {{ data$ | async }}

// ĐÚNG — cách 2: takeUntilDestroyed
ngOnInit() {
  this.someService.getData().pipe(
    takeUntilDestroyed(this.destroyRef)
  ).subscribe(data => this.data = data);
}

Quy tắc: ưu tiên async pipe — Angular tự manage subscription, không bao giờ leak. Nếu cần subscribe trong component (side effect, complex logic), luôn dùng takeUntilDestroyed.

HTTP requests (HttpClient) tự complete sau khi nhận response — technically không cần unsubscribe. NHƯNG nếu user navigate away trước khi response về, subscribe callback vẫn chạy, có thể set state trên destroyed component. Vẫn nên unsubscribe hoặc dùng async pipe.

3. Dùng subscribe chỉ để gán biến

// SAI — subscribe chỉ để gán
ngOnInit() {
  this.invoiceService.getAll().subscribe(data => {
    this.invoices = data;
  });
}
// Template: *ngFor="let inv of invoices"

// ĐÚNG — dùng async pipe
invoices$ = this.invoiceService.getAll();
// Template: *ngFor="let inv of invoices$ | async"

async pipe subscribe, nhận data, và tự unsubscribe khi component destroy. Không cần manage subscription, không memory leak, Angular tự handle change detection.

4. toPromise / firstValueFrom trong component

// SAI — biến Observable thành Promise, mất hết benefit RxJS
async ngOnInit() {
  this.invoices = await firstValueFrom(
    this.invoiceService.getAll()
  );
}

Nếu bạn convert mọi Observable thành Promise, tại sao dùng RxJS? Mất khả năng cancel, combine, retry. firstValueFrom hợp lệ trong service hoặc resolver — nhưng trong component, dùng Observable + async pipe.

5. shareReplay không có refCount

// NGUY HIỂM — Observable không bao giờ cleanup
const data$ = this.http.get('/api/data').pipe(
  shareReplay(1)  // Giữ subscription mãi mãi
);

// ĐÚNG — cleanup khi không còn subscriber
const data$ = this.http.get('/api/data').pipe(
  shareReplay({ bufferSize: 1, refCount: true })
);

shareReplay(1) giữ subscription dù không còn subscriber — source Observable không bao giờ được unsubscribe. Thêm refCount: true để auto-unsubscribe khi subscriber cuối cùng unsubscribe.


Pattern: Component ViewModel với async pipe

Pattern mình dùng nhiều nhất — combine tất cả data thành 1 Observable, dùng 1 async pipe:

@Component({
  template: `
    <ng-container *ngIf="vm$ | async as vm; else loading">
      <app-header [title]="'Invoices (' + vm.total + ')'"/>
      <app-filters
        [statuses]="vm.statuses"
        (statusChange)="onStatusChange($event)"/>
      <app-invoice-table
        [invoices]="vm.invoices"
        [loading]="vm.tableLoading"/>
      <app-pagination
        [currentPage]="vm.page"
        [totalPages]="vm.totalPages"/>
    </ng-container>
    <ng-template #loading>
      <app-skeleton-loader/>
    </ng-template>
  `
})
export class InvoiceListComponent {
  private statusFilter$ = new BehaviorSubject<string>('all');
  private page$ = new BehaviorSubject<number>(1);

  vm$ = combineLatest([
    this.statusFilter$,
    this.page$
  ]).pipe(
    switchMap(([status, page]) =>
      this.invoiceService.getFiltered(status, page).pipe(
        map(response => ({
          invoices: response.items,
          total: response.totalCount,
          totalPages: response.totalPages,
          page,
          statuses: ['all', 'Draft', 'Approved', 'Overdue'],
          tableLoading: false
        })),
        startWith({
          invoices: [],
          total: 0,
          totalPages: 0,
          page,
          statuses: ['all', 'Draft', 'Approved', 'Overdue'],
          tableLoading: true
        }),
        catchError(() => of({
          invoices: [],
          total: 0,
          totalPages: 0,
          page,
          statuses: ['all', 'Draft', 'Approved', 'Overdue'],
          tableLoading: false
        }))
      )
    )
  );

  onStatusChange(status: string) { this.statusFilter$.next(status); }
  onPageChange(page: number) { this.page$.next(page); }
}

Zero subscribe trong component. Một async pipe quản lý tất cả. Loading state tự động qua startWith. Error handling qua catchError. Filter/page change trigger re-fetch qua switchMap (cancel request cũ). Component destroy → async pipe unsubscribe → tất cả cleanup tự động.


Kết luận

RxJS trong Angular không cần biết 100+ operators. Bốn map operators (switchMap, mergeMap, concatMap, exhaustMap), vài combination operators (combineLatest, forkJoin), utilities (tap, catchError, startWith, takeUntilDestroyed), và async pipe — đủ cho 95% use cases.

Quan trọng hơn operators là tránh anti-patterns: không subscribe trong subscribe, không quên unsubscribe, ưu tiên async pipe, catchError trong inner Observable. Những sai lầm này gây memory leak, race condition, và bugs khó debug.

Bắt đầu bằng async pipe cho mọi thứ. Khi cần logic phức tạp hơn, thêm operators vào pipe. Khi cần side effect, subscribe với takeUntilDestroyed. Giữ component clean — logic nặng đẩy vào service, component chỉ combine và hiển thị.

Tags:
Culi Dev

Culi Dev

Enjoy coding, enjoy life!

Leave a comment

Your email address will not be published. Required fields are marked *

Your experience on this site will be improved by allowing cookies Cookie Policy