javascript - Count events over a period of time and yield the sum once every second in RxJS - Stack Overflow

Is it possible to count events over a period of time and yield the sum once every second in RxJS? I hav

Is it possible to count events over a period of time and yield the sum once every second in RxJS? I have a continuous never ending stream of events. Every 1 second I would like to get the total number of events over the last 5 minute window. The idea is to use this to populate a realtime graph.

I know how to do this the traditional way but would really like to understand how it's done with reactive programming.

Is it possible to count events over a period of time and yield the sum once every second in RxJS? I have a continuous never ending stream of events. Every 1 second I would like to get the total number of events over the last 5 minute window. The idea is to use this to populate a realtime graph.

I know how to do this the traditional way but would really like to understand how it's done with reactive programming.

Share Improve this question asked Oct 26, 2015 at 12:45 TobyToby 1751 gold badge2 silver badges7 bronze badges 1
  • I don't know what RxJs is, but if you want to really output sum for the last 5 minutes, you will need a buffer with timestamps, so that you can properly drop values older than 5 minutes. – Tomáš Zato Commented Oct 26, 2015 at 12:48
Add a ment  | 

2 Answers 2

Reset to default 5

Here's how I'd tackle it.

Create one observable that just counts the number of events received and emits this as a running total (via scan). Create a second observable which is just the running total delayed by 5 minutes. Create a third observable which just subtracts the delayed observable from the first observable. This will yield the running total of events that are younger than 5 minutes. Create a final observable that samples this third observable once per second.

const totalLast5Minutes = eventSource.publish(events => {
    const runningTotal = events
        .scan((e, total) => total + 1, 0)
        .startWith(0);
    const totalDelayed5Minutes = runningTotal
        .delay(5000 * 60)
        .startWith(0);
    return Rx.Observable
        .bineLatest(total, totalDelayed5Minutes, (t, td) => t - td);
});

// only sample the value once per second
Rx.Observable
    .interval(1000)
    .withLatestFrom(totalLast5Minutes, (interval, total) => total)
    .subscribe(total => console.log(`total=${total}`));

Here's another way of doing it that I think may be a little simpler. You can use windowWithTime - see ReactiveX and RxJS doc. You can create windows that overlap, so you can have a 5 minute window of events, followed by another 5 minute window of events that started one second later, and another one second after that, etc.

If you take the count of each of these windows, you'll get a series of counts of 5 minutes of events, one second apart. It would look something like this:

source.windowWithTime(5 * 60000, 1000)   // create 5 minute windows, one second apart
      .flatMap(window => window.count()) // take each window and get the count once it pletes (after 5 minutes)
      .subscribe(count => console.log(count));

I've used this to do exactly what you say - get a rate of events over a window of time in a non-stop stream of events. By having the windows overlap, you're generating a moving average of the activity on the stream.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744265590a4565850.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信