RxJS 5 Thinking Reactively | Ben Lesh

By: AngularConnect

543   10   36502

Uploaded on 10/01/2016

Comments (8):

By anonymous    2017-09-20

The problem is a subtle but critical misunderstanding of how RxJS works--but fear not, this is very common.

So given your example:

const pingEpic = action$ =>
  action$.ofType(PING)
    .delay(1000)
    .mapTo({ type: PONG })
    .takeUntil(action$.ofType(CANCEL));

This epic's behavior can be described as filtering all actions not matching type PING. When an action matches, wait 1000ms and then map that action to a different action { type: PONG }, which will be emitted and then dispatched by redux-observable. If at any time while the app is running someone dispatches an action of type CANCEL, then unsubscribe from the source, which means this entire chain will unsubscribe, terminating the epic.

It might be helpful to see how this looks if you did it imperatively:

const pingEpic = action$ => {
  return new Rx.Observable(observer => {
    console.log('[pingEpic] subscribe');
    let timer;

    const subscription = action$.subscribe(action => {
      console.log('[pingEpic] received action: ' + action.type);

      // When anyone dispatches CANCEL, we stop listening entirely!
      if (action.type === CANCEL) {
        observer.complete();
        return;
      }

      if (action.type === PING) {
        timer = setTimeout(() => {
          const output = { type: PONG };
          observer.next(output);
        }, 1000);
      }
    });

    return {
      unsubscribe() {
        console.log('[pingEpic] unsubscribe');
        clearTimeout(timer);
        subscription.unsubscribe();
      }
    };
  });
};

You can run this code with a fake store here: http://jsbin.com/zeqasih/edit?js,console


Instead, what you usually want to do is insulate the subscriber chain you want to be cancellable from the top-level chain that is suppose to listen indefinitely. Although your example (amended from the docs) is contrived, let's run through it first.

Here we use the mergeMap operator to let us take the matched action and map to another, separate observable chain.

Demo: http://jsbin.com/nofato/edit?js,output

const pingEpic = action$ =>
  action$.ofType(PING)
    .mergeMap(() =>
      Observable.timer(1000)
        .takeUntil(action$.ofType(CANCEL))
        .mapTo({ type: PONG })
    );

We use Observable.timer to wait 1000ms, then map the value it emits (which happens be to be the number zero, but that's not important here) to our PONG action. We also say we want to "take" from the timer source until either it completes normally or we receive an action of type CANCEL.

This isolates the chains because mergeMap will continue to subscribe to the observable you return until it errors or completes. But when that happens, it doesn't itself stop subscribing to the source you applied it to; the action$.ofType(PING) in this example.

A more real-world example is in the redux-observable docs in the Cancellation section

Here we placed the .takeUntil() after inside our .mergeMap(), but after our AJAX call; this is important because we want to cancel only the AJAX request, not stop the Epic from listening for any future actions.

const fetchUserEpic = action$ =>
  action$.ofType(FETCH_USER)
    .mergeMap(action =>
      ajax.getJSON(`/api/users/${action.payload}`)
        .map(fetchUserFulfilled)
        .takeUntil(action$.ofType(FETCH_USER_CANCELLED))
    );

This all may sound confusing, but like most powerful things, once you get it, it'll become intuitive. Ben Lesh does an excellent job of explaining how Observables work in his recent talk, including discussing how operators are a chain of Observables and even about isolating subscriber chains. Even though the talk is at AngularConnect, it's not Angular specific.


As an aside, it's important to note that your epics do not swallow or otherwise prevent actions from reaching the reducers, e.g. when you map an incoming action to another, different action. In fact, when your epic receives an action, it has already been through your reducers. Think of your epics as sidecar processes that listens to a stream of your apps actions, but can't prevent normal redux things from happening, it can only emit new actions.

Original Thread

By anonymous    2017-09-20

https://www.youtube.com/watch?v=3LKMwkuK0ZE

This video actually clarified a lot of things for me. Specifically at the 32:00 mark he mentions .share which makes an observable multicast. By default every subscriber does get its own copy. So the fix looks like this:

let rngStream = Rx.Observable
  .interval(1000)
  .map(() => Math.ceil(Math.random()*100))
  .take(5)
  .share();

// Track the lowest number we've seen.
let minStream = rngStream
  .startWith(100)
  .scan((x, y) => Math.min(x, y))

// Track the highest number we've seen.
let maxStream = rngStream
  .startWith(0)
  .scan((x, y) => Math.max(x, y))

Rx.Observable.zip(rngStream, minStream, maxStream)
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.min.js"></script>

Original Thread

By anonymous    2017-09-20

As per my understanding both Observable.create and a normal javascript function are look similar to me . So what is the actual difference between both?

They are both just functions. Observables can seem like dark magic but it comes down to Javascript objects and functions.

Basically, first we need an observer. This is just an object with three properties:

{
  //a function to be executed when we want the Observable to emit. Takes 1 arg
  next:     (val) : void

  //a func to be executed when an uncaught exception occurs. Takes 1 arg
  error:    (err) : void

  //a func to be executed when we want the Observable to stop. Takes no arg
  complete: () : void
}

Now consider the signature of Observable.create

Observable.create(
  function(observer)
) : Observable {}

So to create an observable, we pass it a reference to a function. This function is not executed right away, but it will be executed when we call .subscribe() on the Observable that create() returned.

To make sense of it, suppose we wanted to create an observable that emits the new value of a text input every time it changes.

let obs = Observable.create(observer=>{
  let input = document.querySelector('#myInput');
  input.onchange = (event) => {
     let newVal = event.target.value;
     if(newVal == 'stop') observer.complete();
     else observer.next(event.target.value);
   }
});

All we have at this point is an object obs that is storing a function that will be executed when we call subscribe. Since this function expects a parameter that respects the observer interface, it makes sense that subscribe takes 3 parameters. When we're ready to start listening for value changes:

obs.subscribe(
  // this is the function that will be called from within the Observable
  // when the value changes. Behind the scenes this function is just
  // assigned to observer.next
  newVal => {console.log(newVal)},

  error => {}, // assigned to observer.error

  () => {}     // assigned to observer.complete. Executed if new val is 'stop'
)

Now you see how, each time the onchange event is raised on the input, observer.next(newVal) is called, which is just another name for the first argument of .subscribe()!

In my opinion, what makes Observable awesome is how they can be chained, or composed. Maybe I'm not interested in learning about all value changes, but only those that have a length greater than 3. Easy. Instead of subscribing to the original obs, I apply the .filter() operator:

obs.filter(newVal => newVal.length > 3).subscribe('...')

Notice that what was passed is a function. The end result will be that only the outputs that pass the filter will reach .subscribe(). How is that done? Something like this:

class Observable {
  filter: (filterFunc) {
    // create a new observable. This is now what your code will subscribe to.
    // the original observable is now upstream, and accessed below
    return Observable.create( observer => {
      // subscribe to the original observable so we can see input value changes
      this.subscribe(
        // pass on the value only if it makes it through the filter
        newVal => {if(filterFunc(newVal)) observer.next(newVal)},
        // errors flow downstream
        error  => observer.error(error),
        // Stop immediately if the upstream observable completes
        complete => observer.complete()
      );
    } )
  }
}

Again, nothing magical. the .filter() operator created a new observable that subscribed to the original observable. When the original emits,the new receives the value and manipulates it in a way before passing its own result to its observer. So when you do obs.filter(filterFunc).subscribe(), you're receiving the output of the 2nd observer. It's the number and flexibility of these operators paired with the notify-me-when design paradigm that make Observables so powerful.

I recommend this video by the architect of RxJS 5. If you only have a few minutes, look at this post by the same architect.

Original Thread

By anonymous    2018-01-22

Found this https://www.youtube.com/watch?v=3LKMwkuK0ZE&t=1219s in the other thread, must watch! Now I understand the gist of `catch`

Original Thread

By anonymous    2018-02-18

Observables work in a way that the stream stops emitting values, once it completes or an error happens.

this.searchForm.controls.SearchValue.valueChanges
      .debounceTime(1000)
      .filter(val => val)
      .switchMap(term => {
          return this._http.get(CONFIG.API_ENDPOINT + `/folders/search/` + term)
            .catch(err => {
                console.log(err);
                return Observable.empty()
            })
      })
      .subscribe(result => {
          this.searchResult = result;
      })

You can play with the code here: https://stackblitz.com/edit/angular-http-r1qe3z (enter numbers). This video was very helpful: https://www.youtube.com/watch?v=3LKMwkuK0ZE 26:20 is where it's explained.

Original Thread

Submit Your Video

If you have some great dev videos to share, please fill out this form.