The question is simple, how can i use redux-observable with an EventSource?
With RxJs its like:
const observable = Observable.create(observer => {
const eventSource = new EventSource('/model-observable');
return () => {
eventSource.close();
};
});
observable.subscribe({
next: data => {
this.zone.run(() => this.someStrings.push(data));
},
error: err => console.error('something wrong occurred: ' + err)
});
The question is simple, how can i use redux-observable with an EventSource?
With RxJs its like:
const observable = Observable.create(observer => {
const eventSource = new EventSource('/model-observable');
return () => {
eventSource.close();
};
});
observable.subscribe({
next: data => {
this.zone.run(() => this.someStrings.push(data));
},
error: err => console.error('something wrong occurred: ' + err)
});
Share
Improve this question
edited Dec 6, 2016 at 0:10
jayphelps
15.4k3 gold badges44 silver badges55 bronze badges
asked Dec 5, 2016 at 20:58
MatCasMatCas
84310 silver badges20 bronze badges
1 Answer
Reset to default 9This sounds more like a general RxJS question about how to connect to an EventSource. This can be done in a number of ways. If all you care about are the messages (and not errors/open):
import { fromEvent } from 'rxjs/observable/fromEvent';
const fromEventSource = url => {
return new Observable(observer => {
const source = new EventSource(url);
const message$ = fromEvent(source, 'message');
const subscription = message$.subscribe(observer);
return () => {
subscription.unsubscribe();
source.close();
};
});
};
If you care about open and/or errors, it requires a little more code to pipe everything together:
import { Observable } from 'rxjs/Observable';
import { Subscriber } from 'rxjs/Subscriber';
const fromEventSource = (url, openObserver) => {
return new Observable(observer => {
const open = new Subscriber(openObserver);
const source = new EventSource(url);
const onOpen = event => {
open.next(event);
open.plete();
};
const onError = event => {
if (event.readyState === EventSource.CLOSED) {
observer.plete();
} else {
observer.error(event);
}
};
const onMessage = event => {
observer.next(event.data);
};
source.addEventListener('open', onOpen, false);
source.addEventListener('error', onError, false);
source.addEventListener('message', onMessage, false);
return () => {
source.removeEventListener('open', onOpen, false);
source.removeEventListener('error', onError, false);
source.removeEventListener('message', onMessage, false);
source.close();
};
});
};
fromEventSource('http://some-url.')
.subscribe(value => console.log(value));
Usage in redux-observable would be something like this:
const somethingEpic = action$ =>
action$.ofType(SOMETHING)
.mergeMap(() =>
fromEventSource('http://some-url.')
.map(message => ({
type: MESSAGE,
payload: message
}))
.catch(e => Observable.of({
type: SOMETHING_ERROR,
payload: e,
error: true
}))
);
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745428420a4627283.html
评论列表(0条)