RXJS-每秒汇总事件并重置扫描

  • 本文关键字:扫描 事件 RXJS- rxjs
  • 更新时间 :
  • 英文 :


我正在尝试创建一个可观察到的可观察到的事件, 一分钟的范围 - 但是我无法将汇总重置。

const events = new Rx.Subject();
// example
// create a reactive stream of BTC completed transactions
// aggregate the highs/lows over one second
const btc = events
    .filter(f => f.product_id === "BTC-USD" && f.type === "done")
    .window(Rx.Observable.interval(1000))
    .mergeAll()
    .scan((acc, i) => {
        //console.log(i);
        let price = i.price;
        if (i.price) {
            if (acc.high === -1 || price > acc.high) acc.high = price;
            if (acc.low === -1 || price < acc.low) acc.low = price;
            acc.last = price;
        }
        return acc;
    }, { high: -1, low: -1, last: -1, ts: (new Date()).getTime().toString() })
    .window(Rx.Observable.interval(1000))
    .map(j=> j.last())
    .mergeAll();
// output the stream of aggregates every second
btc.subscribe(j=>console.log(j));

由于我在聚合累加器中有一个时间戳,我可以说我们正在遵守Windows的聚合。

{ high: '14725.97000000',
  low: '14106.01000000',
  last: '14150.52000000',
  ts: '1514089269250' }
{ high: '17279.27000000',
  low: '14059.87000000',
  last: '14162.09000000',
  ts: '1514089269250' }

如何重置扫描?或通过其他方式实现同一件事?

mergeMap替换第一个mergeAllscan - 在其中执行scan

const btc = events
    .filter(f => f.product_id === "BTC-USD" && f.type === "done")
    .window(Rx.Observable.interval(1000))
    .mergeMap(w => w.scan((acc, i) => {
        let price = i.price;
        if (i.price) {
            if (acc.high === -1 || price > acc.high) acc.high = price;
            if (acc.low === -1 || price < acc.low) acc.low = price;
            acc.last = price;
        }
        return acc;
      }, { high: -1, low: -1, last: -1, ts: (new Date()).getTime().toString() })
    )
    .window(Rx.Observable.interval(1000))
    .map(j=> j.last())
    .mergeAll();

另外,如果您使用reduce而不是scan,则不需要第二个window等,因为reduce仅会发出最后一个值:

const btc = events
    .filter(f => f.product_id === "BTC-USD" && f.type === "done")
    .window(Rx.Observable.interval(1000))
    .mergeMap(w => w.reduce((acc, i) => {
        let price = i.price;
        if (i.price) {
            if (acc.high === -1 || price > acc.high) acc.high = price;
            if (acc.low === -1 || price < acc.low) acc.low = price;
            acc.last = price;
        }
        return acc;
      }, { high: -1, low: -1, last: -1, ts: (new Date()).getTime().toString() })
    );

实际上,如果我正确理解您的任务,我认为我们不需要扫描。尝试此示例:

var openings = Rx.Observable.interval(3000);
// Convert the window to an array
var source = Rx.Observable.timer(0, 100)
    .window(openings)
    .take(3) // restrict values just for example
    .flatMap(function (x) { 
    let arrayedValues = x.toArray();
    //Here you can caluculate high, low, last, ts
    return arrayedValues; 
    });
source.subscribe(
(x) => console.log(x)

plunker

最新更新