RxJS 5 Thinking Reactively | Ben Lesh
543 10 36502
By anonymous 2017-09-20
https://www.youtube.com/watch?v=3LKMwkuK0ZE
This video actually clarified a lot of things for me. Specifically at the 32:00 mark he mentions .share
which makes an observable multicast. By default every subscriber does get its own copy. So the fix looks like this:
let rngStream = Rx.Observable
.interval(1000)
.map(() => Math.ceil(Math.random()*100))
.take(5)
.share();
// Track the lowest number we've seen.
let minStream = rngStream
.startWith(100)
.scan((x, y) => Math.min(x, y))
// Track the highest number we've seen.
let maxStream = rngStream
.startWith(0)
.scan((x, y) => Math.max(x, y))
Rx.Observable.zip(rngStream, minStream, maxStream)
.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.min.js"></script>
By anonymous 2017-09-20
As per my understanding both Observable.create and a normal javascript function are look similar to me . So what is the actual difference between both?
They are both just functions. Observables can seem like dark magic but it comes down to Javascript objects and functions.
Basically, first we need an observer
. This is just an object with three properties:
{
//a function to be executed when we want the Observable to emit. Takes 1 arg
next: (val) : void
//a func to be executed when an uncaught exception occurs. Takes 1 arg
error: (err) : void
//a func to be executed when we want the Observable to stop. Takes no arg
complete: () : void
}
Now consider the signature of Observable.create
Observable.create(
function(observer)
) : Observable {}
So to create an observable, we pass it a reference to a function. This function is not executed right away, but it will be executed when we call .subscribe()
on the Observable that create()
returned.
To make sense of it, suppose we wanted to create an observable that emits the new value of a text input every time it changes.
let obs = Observable.create(observer=>{
let input = document.querySelector('#myInput');
input.onchange = (event) => {
let newVal = event.target.value;
if(newVal == 'stop') observer.complete();
else observer.next(event.target.value);
}
});
All we have at this point is an object obs
that is storing a function that will be executed when we call subscribe. Since this function expects a parameter that respects the observer
interface, it makes sense that subscribe
takes 3 parameters. When we're ready to start listening for value changes:
obs.subscribe(
// this is the function that will be called from within the Observable
// when the value changes. Behind the scenes this function is just
// assigned to observer.next
newVal => {console.log(newVal)},
error => {}, // assigned to observer.error
() => {} // assigned to observer.complete. Executed if new val is 'stop'
)
Now you see how, each time the onchange
event is raised on the input, observer.next(newVal)
is called, which is just another name for the first argument of .subscribe()
!
In my opinion, what makes Observable awesome is how they can be chained, or composed. Maybe I'm not interested in learning about all value changes, but only those that have a length greater than 3. Easy. Instead of subscribing to the original obs
, I apply the .filter()
operator:
obs.filter(newVal => newVal.length > 3).subscribe('...')
Notice that what was passed is a function. The end result will be that only the outputs that pass the filter will reach .subscribe()
. How is that done? Something like this:
class Observable {
filter: (filterFunc) {
// create a new observable. This is now what your code will subscribe to.
// the original observable is now upstream, and accessed below
return Observable.create( observer => {
// subscribe to the original observable so we can see input value changes
this.subscribe(
// pass on the value only if it makes it through the filter
newVal => {if(filterFunc(newVal)) observer.next(newVal)},
// errors flow downstream
error => observer.error(error),
// Stop immediately if the upstream observable completes
complete => observer.complete()
);
} )
}
}
Again, nothing magical. the .filter()
operator created a new observable that subscribed to the original observable. When the original emits,the new receives the value and manipulates it in a way before passing its own result to its observer. So when you do obs.filter(filterFunc).subscribe()
, you're receiving the output of the 2nd observer. It's the number and flexibility of these operators paired with the notify-me-when design paradigm that make Observables so powerful.
I recommend this video by the architect of RxJS 5. If you only have a few minutes, look at this post by the same architect.
By anonymous 2018-01-22
Found this https://www.youtube.com/watch?v=3LKMwkuK0ZE&t=1219s in the other thread, must watch! Now I understand the gist of `catch`
By anonymous 2018-02-18
Observables work in a way that the stream stops emitting values, once it completes or an error happens.
this.searchForm.controls.SearchValue.valueChanges
.debounceTime(1000)
.filter(val => val)
.switchMap(term => {
return this._http.get(CONFIG.API_ENDPOINT + `/folders/search/` + term)
.catch(err => {
console.log(err);
return Observable.empty()
})
})
.subscribe(result => {
this.searchResult = result;
})
You can play with the code here: https://stackblitz.com/edit/angular-http-r1qe3z (enter numbers). This video was very helpful: https://www.youtube.com/watch?v=3LKMwkuK0ZE 26:20 is where it's explained.
Submit Your Video
By anonymous 2017-09-20
The problem is a subtle but critical misunderstanding of how RxJS works--but fear not, this is very common.
So given your example:
This epic's behavior can be described as filtering all actions not matching type
PING
. When an action matches, wait 1000ms and then map that action to a different action{ type: PONG }
, which will be emitted and then dispatched by redux-observable. If at any time while the app is running someone dispatches an action of typeCANCEL
, then unsubscribe from the source, which means this entire chain will unsubscribe, terminating the epic.It might be helpful to see how this looks if you did it imperatively:
You can run this code with a fake store here: http://jsbin.com/zeqasih/edit?js,console
Instead, what you usually want to do is insulate the subscriber chain you want to be cancellable from the top-level chain that is suppose to listen indefinitely. Although your example (amended from the docs) is contrived, let's run through it first.
Here we use the mergeMap operator to let us take the matched action and map to another, separate observable chain.
Demo: http://jsbin.com/nofato/edit?js,output
We use
Observable.timer
to wait 1000ms, then map the value it emits (which happens be to be the number zero, but that's not important here) to ourPONG
action. We also say we want to "take" from the timer source until either it completes normally or we receive an action of typeCANCEL
.This isolates the chains because
mergeMap
will continue to subscribe to the observable you return until it errors or completes. But when that happens, it doesn't itself stop subscribing to the source you applied it to; theaction$.ofType(PING)
in this example.A more real-world example is in the redux-observable docs in the Cancellation section
This all may sound confusing, but like most powerful things, once you get it, it'll become intuitive. Ben Lesh does an excellent job of explaining how Observables work in his recent talk, including discussing how operators are a chain of Observables and even about isolating subscriber chains. Even though the talk is at AngularConnect, it's not Angular specific.
As an aside, it's important to note that your epics do not swallow or otherwise prevent actions from reaching the reducers, e.g. when you map an incoming action to another, different action. In fact, when your epic receives an action, it has already been through your reducers. Think of your epics as sidecar processes that listens to a stream of your apps actions, but can't prevent normal redux things from happening, it can only emit new actions.
Original Thread