Learn RxJS Angular, RxJS pipe, async pipe, RxJS observables, Behaviorsubject, Combinelatest

In this video I want to talk about RxJS, RxJS streams, observables and how to use RxJS inside Angular.

Inside RxJS we have hundreds of different method. And it is really difficult for the beginner to understand all this methods together with Angular. This is why in this post I want to focus only on things that I use inside RxJS every single day because they are super efficient and is needed in every single project.

RxJS of

The first thing that I want to talk about in RxJS is a function of.

export class AppComponent implements OnInit {
  users = [
    { id: '1', name: 'John', isActive: true },
    { id: '2', name: 'Jack', isActive: true },
    { id: '2', name: 'Mike', isActive: true },
  ];

As you can see here I have an array of users and this is just a plain JavaScript array. Now let's say that we want to create a stream from this data.

users$ = of(this.users);

Here we created a stream users$ (we mark streams with dollar sign at the end) and we called here of function from RxJS. Inside of we can pass any data that we want.

Of function converts plain data in the stream

Now as you can see users$ is an Observable<{id: string; name: string; isActive: boolean}[]>.

This is really a nice way to convert your plain data in a stream.

Wrong approach

But typically what all people do when they just start with RxJS or Angular they want to get rid of streams as soon as possible.

export class AppComponent implements OnInit {
  ...
  ngOnInit(): void {
    this.users$.subscribe((users) => {
      console.log('users', users);
    });
  }
}

So they subscribe to all streams that they have in order to get plain data because it is more comfortable for them to work without stream. This is totally fine as a beginner approach but it is not how it is intended to be used because RxJS inside Angular shines when we use it together with async pipe and when we use subscribe as little as possible.

Async pipe

The typical usecase here will be to render a list of our users. And we should not use here subscribe but async pipe in html instead.

<div *ngFor="let user of users$ | async">
    {{ user.name }}
</div>

As you can see in browser we rendered our array without subscribe function.

If you can use async pipe instead of subscribe then just do it.

In this case Angular itself creates a subscription and unsubscribes when our component is destroyed. If you write subscribe yourself you MUST already unsubscribe from the subscription. In other case you will have a memory leak.

RxJS Map

But typically people don't understand how they can transform with RxJS. This is why they want to get rid of RxJS and just write plain JavaScript. This is why here I want to show you some methods that will help you to work with subscriptions.

Let's say that we want to create from our users$ stream new stream just with names of this users. So from every single object we want to take a name.

usernames$ = this.users$.pipe(map((users) => users.map((user) => user.name)));

We created here usernames$ stream and transformed it by using map. But you must remember that inside RxJS in order to use any functions we must call pipe function. It allows us to apply functions to the stream. Here we used map function inside pipe and we get access to our array inside the stream which we can directly change.

So inside usernames$ we got an Observable

We didn't use here subscribe and we didn't create additional properties here. Now we can directly render this stream inside our HTML.

<div *ngFor="let username of usernames$ | async">
  {{ username }}
</div>

As you can see it is working our of the box.

If you need to transform a stream it makes a lot of sense to use map function for this.

RxJS Filter

Another super important function is filter. You can understand from the name it filters our data. But it don't work like filter inside Javascript. When we filter an array inside Javascript we simply leave less elements. But when we filter stream it means that we just come thorough the stream if the predicate is true.

For example here we can create filteredUsers$ and we will have data inside only when all our users are active.

filteredUsers$ = this.users$.pipe(
  filter((users) => users.every((user) => user.isActive))
);

Here we used filter function from RxJS and inside with every we checked every single element to be active. If one of the users is inactive then we will never get here. This stream will never be fulfilled.

<div *ngFor="let user of filteredUsers$">
  {{ user.name }}
</div>

This block will be rendered only if all our users are active. The same with subscribe if we will write subscribe to this stream we never see our console.log if our stream is not fulfilled.

this.filteredUsers$.subscribe((users) => {
  console.log('users', users);
});

Also you should not forget that you can easily apply functions on stream one by one.

filteredUsers$ = this.users$.pipe(
  filter((users) => users.every((user) => user.isActive)),
  map((users) => users.map((user) => user.name))
);

Then we get list of usernames only if all our users are active.

RxJS BehaviorSubject

What you will use every single day is BehaviorSubject. It might sound really scary but actually it is not. Let's create here a new user stream.

user$ = new BehaviorSubject<{ id: string; name: string } | null>(null);

Here we defined what data are possible to provide inside. In round brackets we set a default value.

Why do we write code like this? Typically we want to get some data from the API. For this you need a stream that you will fill with data. So you have a page where you render your user. By default you don't have a user this is why we wrote null.

But then after some time we fetched user from the backend and we must update our stream. At that moment your stream is automatically rendered. This is exactly why we need BehaviorSubject.

ngOnInit(): void {
  setTimeout(() => {
    this.user$.next({ id: '1', name: 'John' });
  }, 2000);
}

Here inside ngOnInit we set data inside a stream by using next after 2 seconds to simulate an API call.

Let's write here subscribe so we understand how it works.

this.user$.subscribe((user) => {
  console.log('user', user);
});

As you can see we get 2 logs. Once with null and after 2 seconds with correct data.

If we need to render this data we can do it directly inside html.

<div *ngIf="user$ | async as user">{{ user.name }}</div>

Here we render this block only when our stream is not null which is exactly what we wanted.

RxJS fromEvent

Another important function that you will use from time to time is fromEvent. It helps you to work with DOM events like with streams.

documentClick$ = fromEvent(document, 'click');

ngOnInit(): void {
  this.documentClick$.subscribe((e) => {
    console.log('e', e);
  });
}

Here we created a stream for click event on our document. After this we subscribed to this event and we can do something every single time when we click anywhere.

And here you might ask does it make any sense to to this and not just use addEventListener? If you need to apply RxJS methods on your stream or you want to combine data from different streams it makes a lot of sense.

RxJS CombineLatest

This function is extremely important. This is exactly a topic of combining our streams of data. Typically you want to take all your streams for the component and combine them together. In this case it is much easier to support a component and your markup inside HTML will be much cleaner.

data$ = combineLatest([
  this.users$,
  this.usernames$,
  this.filteredUsers$,
]).pipe(
  map(([users, usernames, filteredUsers]) => ({
    users,
    usernames,
    filteredUsers,
  }))
);

Here we created new stream data$ and combined 3 streams in it. The problem is that now in data$ we have an array and it is not comfortable to work with. This is why we used map to transform it into the object.

Now every single time when new value comes in any stream our data$ stream will be updated

And we can adjust our HTML

<div *ngIf="data$ | async as data">
  <div *ngFor="let user of data.users">
    {{ user.name }}
  </div>

  ----

  <div *ngFor="let username of data.usernames">
    {{ username }}
  </div>

  ----

  <div *ngFor="let user of data.filteredUsers">
    {{ user.name }}
  </div>
</div>

Here we used as keyword which creates local property from the stream. Now inside this block we can access every single stream without additional async pipes.

This is the best variant to work with streams inside component

Want to conquer your next JavaScript interview? Download my FREE PDF - Pass Your JS Interview with Confidence and start preparing for success today!

📚 Source code of what we've done
Did you like my post? Share it with friends!
Don't miss a thing!
Follow me on Youtube, Twitter or Instagram.
Oleksandr Kocherhin
Oleksandr Kocherhin is a full-stack developer with a passion for learning and sharing knowledge on Monsterlessons Academy and on his YouTube channel. With around 15 years of programming experience and nearly 9 years of teaching, he has a deep understanding of both disciplines. He believes in learning by doing, a philosophy that is reflected in every course he teaches. He loves exploring new web and mobile technologies, and his courses are designed to give students an edge in the fast-moving tech industry.