我正在试验数据流,我想在其中聚合时间序列数据。reduce有效,但是我找不到将生成的数组返回到另一个流的解决方案。
当我调用reduce上的映射时,我只得到整个数组作为结果。不是数组中的数据。
欢迎任何想法或暗示。
const fs = require('fs')
const highland = require('highland')
const streamAgg = (aggData, parts) => {
if (!aggData[parts.groupBySec]) {
aggData[parts.groupBySec] = {}
aggData[parts.groupBySec]['volume'] = parts.volume
aggData[parts.groupBySec]['start-time'] = parts.timeStamp
aggData[parts.groupBySec]['end-time'] = parts.timeStamp
} else {
aggData[parts.groupBySec]['volume'] += parts.volume
aggData[parts.groupBySec]['end-time'] = parts.timeStamp
}
return aggData
}
highland(fs.createReadStream('./timeseriesdata.csv', 'utf8'))
.split()
.map(line => line.split(','))
.map(parts => ({
timeStamp: parts[0],
timeStampParsed: Date.parse(parts[0]),
groupBySec: Math.floor(Date.parse(parts[0])/1000)*1000,
volume: Number(parts[3]),
}))
.reject(parts => isNaN(parts.timeStampParsed))
.reduce([], streamAgg)
.map(x => x)
.each(x => console.log(x))
似乎您正在缩减为一个数组而不是一个对象(您的reducer函数似乎期望这样(。这项工作:
const fs = require('fs');
const highland = require('highland');
const streamAgg = (aggData, parts) => {
if (!aggData[parts.groupBySec]) {
aggData[parts.groupBySec] = {};
aggData[parts.groupBySec].volume = parts.volume;
aggData[parts.groupBySec]['start-time'] = parts.timeStamp;
aggData[parts.groupBySec]['end-time'] = parts.timeStamp;
} else {
aggData[parts.groupBySec].volume += parts.volume;
aggData[parts.groupBySec]['end-time'] = parts.timeStamp;
}
return aggData;
};
highland(fs.createReadStream('./timeseriesdata.csv', 'utf8'))
.split()
.map(line => line.split(','))
.map(parts => ({
timeStamp: parts[0],
timeStampParsed: Date.parse(parts[0]),
groupBySec: Math.floor(Date.parse(parts[0]) / 1000) * 1000,
volume: Number(parts[3])
}))
.reject(parts => isNaN(parts.timeStampParsed))
.reduce({}, streamAgg)
.doto(console.log)
.done(() => {
process.exit(0);
});