(xpost GitHub issue)
我是风暴的新手。我找到了有用的节点风暴库,并且已成功提交拓扑,但我无法让我的喷口发出元组流。
Node-storm 的 wordcount
示例工作正常。
我想要一个订阅 websocket 并将任何消息输出为元组的喷口。
这是我到目前为止的尝试。我想我有一些配置错误,因为我知道我的wsEmitter
正在发出future
事件,但我的 Storm UI 显示零喷口发射。
怀疑也许我不应该在喷口函数中绑定侦听器?
此函数是否会被多次调用?(看起来...见 https://github.com/RallySoftware/node-storm/blob/master/lib/spout.js#L4)
sync
实际上做了什么,我应该什么时候使用它?
var storm = require('node-storm');
var wsEmitter = require('./wsEmitter.js')();
wsEmitter.init(); // subscribe to websocket
var futuresSpout = storm.spout(function(sync) {
var self = this;
console.log('subscribing to ws');
wsEmitter.on('future', function(data){ // websocket data arrived
self.emit([data]);
sync();
});
})
.declareOutputFields(["a"]);
原来我有两个问题。首先,我的拓扑没有执行,因为我的一个螺栓(未显示)无法设置.declareOutputFields()
。
其次,我需要延迟从喷口发出的信号,直到主管要求发出一次nextTick()
。我通过缓冲任何传入的消息来做到这一点,直到主管调用喷口:
module.exports = (function(){
var storm = require('node-storm');
var wsEmitter = require('./wsEmitter.js')();
wsEmitter.init();
var queue = [];
var queueEmpty = true;
wsEmitter.on('thing', function(data){
var trade = JSON.parse(data);
trade.timeReported = new Date().valueOf();
queue.push(trade);
queueEmpty = false;
});
return storm.spout(function(sync) {
var self = this;
setTimeout(function(){
if(!queueEmpty){
self.emit([queue.shift()]);
queueEmpty =
( queue.length === 0
? true
: false )
}
sync();
}, 100);
})
.declareOutputFields(['trade'])
})()