Is it possible to count events over a period of time and yield the sum once every second in RxJS? I have a continuous never ending stream of events. Every 1 second I would like to get the total number of events over the last 5 minute window. The idea is to use this to populate a realtime graph.
I know how to do this the traditional way but would really like to understand how it's done with reactive programming.
Is it possible to count events over a period of time and yield the sum once every second in RxJS? I have a continuous never ending stream of events. Every 1 second I would like to get the total number of events over the last 5 minute window. The idea is to use this to populate a realtime graph.
I know how to do this the traditional way but would really like to understand how it's done with reactive programming.
Share Improve this question asked Oct 26, 2015 at 12:45 TobyToby 1751 gold badge2 silver badges7 bronze badges 1- I don't know what RxJs is, but if you want to really output sum for the last 5 minutes, you will need a buffer with timestamps, so that you can properly drop values older than 5 minutes. – Tomáš Zato Commented Oct 26, 2015 at 12:48
2 Answers
Reset to default 5Here's how I'd tackle it.
Create one observable that just counts the number of events received and emits this as a running total (via scan
).
Create a second observable which is just the running total delayed by 5 minutes.
Create a third observable which just subtracts the delayed observable from the first observable. This will yield the running total of events that are younger than 5 minutes.
Create a final observable that samples this third observable once per second.
const totalLast5Minutes = eventSource.publish(events => {
const runningTotal = events
.scan((e, total) => total + 1, 0)
.startWith(0);
const totalDelayed5Minutes = runningTotal
.delay(5000 * 60)
.startWith(0);
return Rx.Observable
.bineLatest(total, totalDelayed5Minutes, (t, td) => t - td);
});
// only sample the value once per second
Rx.Observable
.interval(1000)
.withLatestFrom(totalLast5Minutes, (interval, total) => total)
.subscribe(total => console.log(`total=${total}`));
Here's another way of doing it that I think may be a little simpler. You can use windowWithTime - see ReactiveX and RxJS doc. You can create windows that overlap, so you can have a 5 minute window of events, followed by another 5 minute window of events that started one second later, and another one second after that, etc.
If you take the count of each of these windows, you'll get a series of counts of 5 minutes of events, one second apart. It would look something like this:
source.windowWithTime(5 * 60000, 1000) // create 5 minute windows, one second apart
.flatMap(window => window.count()) // take each window and get the count once it pletes (after 5 minutes)
.subscribe(count => console.log(count));
I've used this to do exactly what you say - get a rate of events over a window of time in a non-stop stream of events. By having the windows overlap, you're generating a moving average of the activity on the stream.
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744265590a4565850.html
评论列表(0条)