我正在尝试创建一个可观察到的可观察到的事件, 一分钟的范围 - 但是我无法将汇总重置。
const events = new Rx.Subject();
// example
// create a reactive stream of BTC completed transactions
// aggregate the highs/lows over one second
const btc = events
.filter(f => f.product_id === "BTC-USD" && f.type === "done")
.window(Rx.Observable.interval(1000))
.mergeAll()
.scan((acc, i) => {
//console.log(i);
let price = i.price;
if (i.price) {
if (acc.high === -1 || price > acc.high) acc.high = price;
if (acc.low === -1 || price < acc.low) acc.low = price;
acc.last = price;
}
return acc;
}, { high: -1, low: -1, last: -1, ts: (new Date()).getTime().toString() })
.window(Rx.Observable.interval(1000))
.map(j=> j.last())
.mergeAll();
// output the stream of aggregates every second
btc.subscribe(j=>console.log(j));
由于我在聚合累加器中有一个时间戳,我可以说我们正在遵守Windows的聚合。
{ high: '14725.97000000',
low: '14106.01000000',
last: '14150.52000000',
ts: '1514089269250' }
{ high: '17279.27000000',
low: '14059.87000000',
last: '14162.09000000',
ts: '1514089269250' }
如何重置扫描?或通过其他方式实现同一件事?
用mergeMap
替换第一个mergeAll
和scan
- 在其中执行scan
:
const btc = events
.filter(f => f.product_id === "BTC-USD" && f.type === "done")
.window(Rx.Observable.interval(1000))
.mergeMap(w => w.scan((acc, i) => {
let price = i.price;
if (i.price) {
if (acc.high === -1 || price > acc.high) acc.high = price;
if (acc.low === -1 || price < acc.low) acc.low = price;
acc.last = price;
}
return acc;
}, { high: -1, low: -1, last: -1, ts: (new Date()).getTime().toString() })
)
.window(Rx.Observable.interval(1000))
.map(j=> j.last())
.mergeAll();
另外,如果您使用reduce
而不是scan
,则不需要第二个window
等,因为reduce
仅会发出最后一个值:
const btc = events
.filter(f => f.product_id === "BTC-USD" && f.type === "done")
.window(Rx.Observable.interval(1000))
.mergeMap(w => w.reduce((acc, i) => {
let price = i.price;
if (i.price) {
if (acc.high === -1 || price > acc.high) acc.high = price;
if (acc.low === -1 || price < acc.low) acc.low = price;
acc.last = price;
}
return acc;
}, { high: -1, low: -1, last: -1, ts: (new Date()).getTime().toString() })
);
实际上,如果我正确理解您的任务,我认为我们不需要扫描。尝试此示例:
var openings = Rx.Observable.interval(3000);
// Convert the window to an array
var source = Rx.Observable.timer(0, 100)
.window(openings)
.take(3) // restrict values just for example
.flatMap(function (x) {
let arrayedValues = x.toArray();
//Here you can caluculate high, low, last, ts
return arrayedValues;
});
source.subscribe(
(x) => console.log(x)
plunker