André Staltz (@andrestaltz): You will learn RxJS at ng-europe 2016

By: ng-europe

461   2   14762

Uploaded on 11/01/2016

Reactive programming with Observables can seem like a hard skill to learn. In this talk you will see Andre live code and explain the basics of RxJS Observables in a way that will demystify the concepts. We will build our own Observable from scratch, as well as our own basic operators, then see why RxJS can easily solve your async events problems.

Comments (6):

By anonymous    2017-09-20

An Observable is essentially just a wrapper for onSuccess(), onError() and onComplete() callbacks. If functions that are executed are synchronous an Observable will be too. This is core functionality of an Observable (the rest is just cleanup):

export class Observable {
  constructor(subscribe) {}
  subscribe(observerOrNext, error, complete) {} // callbacks
}
Observable.create = (subscribe) => {
    return new Observable(subscribe); // dot-chaining
}

Watch this video where André Staltz constructs observable from scratch.

Original Thread

By anonymous    2017-09-20

You have to remember that Promise and Observable are "just" wrappers for callbacks. When you chain then()s in Promise chain or use operators with Observable chains you are passing function or a value from one callback to the next.

Consider this fake code:

const fn = (something) => { return other(something); }

Observable.from([1, 2, 3])
  .map(o => fn(o))
  .swithMap(o => Promise.resolve(o => fn(o)))
  .swithMap(o => Observable.of(o => fn(o)))

You use switchMap() to get a value from some wrapped callback asynchronously (either from Promise, or Observable). map() is a synchronous operator and you are dealing with values directly.

If you want more details about how things work, watch this awesome talk by André Staltz, and You will learn RxJS.

Original Thread

By anonymous    2017-09-20

If I got it right, you want your epic to produce the following sequence of actions in response to each SUBMIT_LOGIN:

GENERATE_DEVICE_ID -- RESOLVE_LOGIN -- LOAD_ABOUT

Also, I guess that GENERATE_DEVICE_ID needs to be issued immediately after SUBMIT_LOGIN is received, while RESOLVE_LOGIN and LOAD_ABOUT should be issued only after a stream returned by login() emits.

If my guess is correct, then you just need to start the nested observable (the one created per each SUBMIT_LOGIN) with GENERATE_DEVICE_ID action and startWith operator does exactly that:

const submitLoginEpic = action$ =>
    action$.ofType(SUBMIT_LOGIN)
        .mergeMap(({ payload }) =>
            login(payload.email, payload.password)
                .mergeMap(({ response }) => Rx.Observable.of(resolveLogin(response.content), loadAbout()))
                .startWith(generateDeviceId(uuidv1()))
        );

Update: one possible alternative could be to use concat operator: obs1.concat(obs2) subscribes to the obs2 only when obs1 has completed.

Note also that if login() needs to be called after GENERATE_DEVICE_ID has been dispatched, you might want to wrap it in a "cold" observable:

const login$ = payload =>
    Rx.Observable.create(observer => {
        return login(payload.email, payload.password).subscribe(observer);
    });

const submitLoginEpic = action$ =>
    action$.ofType(SUBMIT_LOGIN)
        .mergeMap(({ payload }) =>
            Rx.Observable.of(generateDeviceId(uuidv1()))
                .concat(login$(payload).map(({ response }) => resolveLogin(response.content)))
                .concat(Rx.Observable.of(loadAbout()))
        );

This way GENERATE_DEVICE_ID is emitted before login() is called, i.e. the sequence would be

GENERATE_DEVICE_ID -- login() -- RESOLVE_LOGIN -- LOAD_ABOUT

Update 2: The reason why login() works not as expected is because it depends on an external state (const state = getCurrentState()) which is different at the points in time when login() is called and when an observable returned by login() is subscribed to. AjaxRequest captures the state at the point when login() is called, which happens before GENERATE_DEVICE_ID is dispatched to the store. At that point no network request is performed yet, but ajax observable is already configured based on a wrong state.

To see what happens, let's simplify the things a bit and rewrite the epic this way:

const createInnerObservable = submitLoginAction => {
    return Observable.of(generateDeviceId()).concat(login());
}

const submitLoginEpic = action$ =>
    action$.ofType(SUBMIT_LOGIN).mergeMap(createInnerObservable);

When SUBMIT_LOGIN action arrives, mergeMap() first calls createInnerObservable() function. The function needs to create a new observable and to do that it has to call generateDeviceId() and login() functions. When login() is called, the state is still old as at this point the inner observable has not been created and thus there was no chance for GENERATE_DEVICE_ID to be dispatched. Because of that login() returns an ajax observable configured with an old data and it becomes a part of the resulting inner observable. As soon as createInnerObservable() returns, mergeMap() subscribes to the returned inner observable and it starts to emit values. GENERATE_DEVICE_ID comes first, gets dispatched to the store and the state gets changed. After that, ajax observable (which is now a part of the inner observable) is subscribed to and performs a network request. But the new state has no effect on that as ajax observable has already been initialized with an old data.

Wrapping login into an Observable.create postpones the call until an observable returned by Observable.create is subscribed to, and at that point the state is already up-to-date.

An alternative to that could be introducing an extra epic which would react to GENERATE_DEVICE_ID action (or a different one, whichever suits your domain) and send a login request, e.g.:

const submitLogin = payload => ({ type: "SUBMIT_LOGIN", payload });

// SUBMIT_LOGIN_REQUESTED is what used to be called SUBMIT_LOGIN
const submitLoginRequestedEpic = action$ =>
    action$.ofType(SUBMIT_LOGIN_REQUESTED)
        .mergeMap(({ payload }) => Rx.Observable.of(
            generateDeviceId(uuidv1()),
            submitLogin(payload))
        );

const submitLoginEpic = (action$, store) =>
    action$.ofType(SUBMIT_LOGIN)
        .mergeMap(({ payload }) => {
            // explicitly pass all the data required to login
            const { token, deviceId } = store.getState().user;
            return login(payload.email, payload.password, token, deviceId)
                .map(({ response }) => resolveLogin(response.content))
                .concat(loadAbout());
        });

Learning Resources

As redux-observable is based on RxJS, it makes sense to get comfortable with Rx first.

I highly recommend watching "You will learn RxJS" talk by André Staltz. It should give an intuition of what observables are and how they work under the hood.

André has also authored these remarkable lessons on egghead:

Also Jay Phelps has given a brilliant talk on redux-observable, it definitely worth watching.

Original Thread

Submit Your Video

If you have some great dev videos to share, please fill out this form.