home / skills / pluginagentmarketplace / custom-plugin-angular / rxjs

rxjs skill

/skills/rxjs

This skill helps you implement RxJS observables, apply operators, manage memory with unsubscribe patterns, and build reactive data pipelines in Angular.

npx playbooks add skill pluginagentmarketplace/custom-plugin-angular --skill rxjs

Review the files below or copy the command above to add this skill to your agents.

Files (10)
SKILL.md
7.0 KB
---
name: rxjs-implementation
description: Implement RxJS observables, apply operators, fix memory leaks with unsubscribe patterns, handle errors, create subjects, and build reactive data pipelines in Angular applications.
sasmp_version: "1.3.0"
bonded_agent: 03-reactive-programming
bond_type: PRIMARY_BOND
---

# RxJS Implementation Skill

## Quick Start

### Observable Basics
```typescript
import { Observable } from 'rxjs';

// Create observable
const observable = new Observable((observer) => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

// Subscribe
const subscription = observable.subscribe({
  next: (value) => console.log(value),
  error: (error) => console.error(error),
  complete: () => console.log('Done')
});

// Unsubscribe
subscription.unsubscribe();
```

### Common Operators
```typescript
import { map, filter, switchMap, takeUntil } from 'rxjs/operators';

// Transformation
data$.pipe(
  map(user => user.name),
  filter(name => name.length > 0)
).subscribe(name => console.log(name));

// Higher-order
userId$.pipe(
  switchMap(id => this.userService.getUser(id))
).subscribe(user => console.log(user));
```

## Subjects

### Subject Types
```typescript
import { Subject, BehaviorSubject, ReplaySubject } from 'rxjs';

// Subject - No initial value
const subject = new Subject<string>();
subject.next('hello');

// BehaviorSubject - Has initial value
const behavior = new BehaviorSubject<string>('initial');
behavior.next('new value');

// ReplaySubject - Replays N values
const replay = new ReplaySubject<string>(3);
replay.next('one');
replay.next('two');
```

### Service with Subject
```typescript
@Injectable()
export class NotificationService {
  private messageSubject = new Subject<string>();
  public message$ = this.messageSubject.asObservable();

  notify(message: string) {
    this.messageSubject.next(message);
  }
}

// Usage
constructor(private notification: NotificationService) {
  this.notification.message$.subscribe(msg => {
    console.log('Notification:', msg);
  });
}
```

## Transformation Operators

```typescript
// map - Transform values
source$.pipe(
  map(user => user.name)
)

// switchMap - Switch to new observable (cancel previous)
userId$.pipe(
  switchMap(id => this.userService.getUser(id))
)

// mergeMap - Merge all results
fileIds$.pipe(
  mergeMap(id => this.downloadFile(id))
)

// concatMap - Sequential processing
tasks$.pipe(
  concatMap(task => this.processTask(task))
)

// exhaustMap - Ignore new while processing
clicks$.pipe(
  exhaustMap(() => this.longRequest())
)
```

## Filtering Operators

```typescript
// filter - Only pass matching values
data$.pipe(
  filter(item => item.active)
)

// first - Take first value
data$.pipe(first())

// take - Take N values
data$.pipe(take(5))

// takeUntil - Take until condition
data$.pipe(
  takeUntil(this.destroy$)
)

// distinct - Filter duplicates
data$.pipe(
  distinct(),
  distinctUntilChanged()
)

// debounceTime - Wait N ms
input$.pipe(
  debounceTime(300),
  distinctUntilChanged()
)
```

## Combination Operators

```typescript
import { combineLatest, merge, concat, zip } from 'rxjs';

// combineLatest - Latest from all
combineLatest([user$, settings$, theme$]).pipe(
  map(([user, settings, theme]) => ({ user, settings, theme }))
)

// merge - Values from any
merge(click$, hover$, input$)

// concat - Sequential
concat(request1$, request2$, request3$)

// zip - Wait for all
zip(form1$, form2$, form3$)

// withLatestFrom - Combine with latest
click$.pipe(
  withLatestFrom(user$),
  map(([click, user]) => ({ click, user }))
)
```

## Error Handling

```typescript
// catchError - Handle errors
data$.pipe(
  catchError(error => {
    console.error('Error:', error);
    return of(defaultValue);
  })
)

// retry - Retry on error
request$.pipe(
  retry(3),
  catchError(error => throwError(error))
)

// timeout - Timeout if no value
request$.pipe(
  timeout(5000),
  catchError(error => of(null))
)
```

## Memory Leak Prevention

### Unsubscribe Pattern
```typescript
private destroy$ = new Subject<void>();

ngOnInit() {
  this.data$.pipe(
    takeUntil(this.destroy$)
  ).subscribe(data => {
    this.processData(data);
  });
}

ngOnDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}
```

### Async Pipe (Preferred)
```typescript
// Component
export class UserComponent {
  user$ = this.userService.getUser(1);

  constructor(private userService: UserService) {}
}

// Template - Async pipe handles unsubscribe
<div>{{ user$ | async as user }}
  <p>{{ user.name }}</p>
</div>
```

## Advanced Patterns

### Share Operator
```typescript
// Hot observable - Share single subscription
readonly users$ = this.http.get('/api/users').pipe(
  shareReplay(1) // Cache last result
);

// Now multiple subscriptions use same HTTP request
this.users$.subscribe(users => {...});
this.users$.subscribe(users => {...}); // Reuses cached
```

### Scan for State
```typescript
// Accumulate state
const counter$ = clicks$.pipe(
  scan((count) => count + 1, 0)
)

// Complex state
const appState$ = actions$.pipe(
  scan((state, action) => {
    switch(action.type) {
      case 'ADD_USER': return { ...state, users: [...state.users, action.user] };
      case 'DELETE_USER': return { ...state, users: state.users.filter(u => u.id !== action.id) };
      default: return state;
    }
  }, initialState)
)
```

### Forkjoin for Multiple Requests
```typescript
// Parallel requests
forkJoin({
  users: this.userService.getUsers(),
  settings: this.settingService.getSettings(),
  themes: this.themeService.getThemes()
}).subscribe(({ users, settings, themes }) => {
  console.log('All loaded:', users, settings, themes);
})
```

## Testing Observables

```typescript
import { marbles } from 'rxjs-marbles';

it('should map values correctly', marbles((m) => {
  const source = m.hot('a-b-|', { a: 1, b: 2 });
  const expected = m.cold('x-y-|', { x: 2, y: 4 });

  const result = source.pipe(
    map(x => x * 2)
  );

  m.expect(result).toBeObservable(expected);
}));
```

## Best Practices

1. **Always unsubscribe**: Use takeUntil or async pipe
2. **Use higher-order operators**: switchMap, mergeMap, etc.
3. **Avoid nested subscriptions**: Use operators instead
4. **Share subscriptions**: Use share/shareReplay for expensive operations
5. **Handle errors**: Always include catchError
6. **Type your observables**: `Observable<User>` not just `Observable`

## Common Mistakes to Avoid

```typescript
// ❌ Wrong - Creates multiple subscriptions
this.data$.subscribe(d => {
  this.data$.subscribe(d2 => {
    // nested subscriptions!
  });
});

// ✅ Correct - Use switchMap
this.data$.pipe(
  switchMap(d => this.otherService.fetch(d))
).subscribe(result => {
  // handled
});

// ❌ Wrong - Memory leak
ngOnInit() {
  this.data$.subscribe(data => this.data = data);
}

// ✅ Correct - Unsubscribe or async
ngOnInit() {
  this.data$ = this.service.getData();
}
// In template: {{ data$ | async }}
```

## Resources

- [RxJS Documentation](https://rxjs.dev/)
- [Interactive Diagrams](https://rxmarbles.com/)
- [RxJS Operators](https://rxjs.dev/api)

Overview

This skill teaches how to implement RxJS observables and operators in Angular applications to build robust reactive pipelines. It focuses on creating observables and subjects, applying transformation/combination/filtering operators, handling errors, and preventing memory leaks with unsubscribe patterns. Practical examples and patterns help integrate RxJS into services and components for predictable state and side-effect management.

How this skill works

The skill inspects observable creation, operator composition, and subscription lifecycles to ensure correct data flow and resource cleanup. It demonstrates subject types (Subject, BehaviorSubject, ReplaySubject), higher-order mapping (switchMap, mergeMap, concatMap), combination operators (combineLatest, forkJoin), and error handling (catchError, retry, timeout). It also covers unsubscribe strategies (takeUntil, async pipe) and sharing strategies (shareReplay).

When to use it

  • When building reactive forms, HTTP requests, or event streams in Angular
  • When you need to compose and transform asynchronous data flows with operators
  • When multiple components must share a single data source or cached request
  • When preventing memory leaks from long-lived subscriptions is required
  • When implementing state management with streams and immutable updates

Best practices

  • Always unsubscribe: prefer takeUntil with a destroy$ or the async pipe in templates
  • Avoid nested subscriptions: use higher-order operators like switchMap and mergeMap
  • Share expensive Observables with share or shareReplay to prevent duplicate side effects
  • Handle errors locally with catchError and provide fallbacks or retries where appropriate
  • Type observables explicitly (e.g., Observable<User>) to keep compiler safety and clarity

Example use cases

  • Expose a NotificationService with a Subject and a public message$ observable for cross-component alerts
  • Load user details on id change with userId$.pipe(switchMap(id => userService.getUser(id))) to cancel prior requests
  • Combine user, settings, and theme streams with combineLatest to build a derived UI model
  • Use takeUntil(this.destroy$) in components to automatically unsubscribe on ngOnDestroy
  • Cache an HTTP GET with shareReplay(1) so multiple subscribers reuse the same response

FAQ

How do I avoid memory leaks when subscribing in components?

Use the async pipe in templates or takeUntil with a private destroy$ that you next() and complete() in ngOnDestroy.

When should I use switchMap vs mergeMap?

Use switchMap to cancel prior inner observables when new source values arrive; use mergeMap to process all inner observables concurrently when cancellation is not desired.