如何使用 emit() 和 sync() 从风暴喷口输出元组流



(xpost GitHub issue)

我是风暴的新手。我找到了有用的节点风暴库,并且已成功提交拓扑,但我无法让我的喷口发出元组流。

Node-storm 的 wordcount 示例工作正常。

我想要一个订阅 websocket 并将任何消息输出为元组的喷口。

这是我到目前为止的尝试。我想我有一些配置错误,因为我知道我的wsEmitter正在发出future事件,但我的 Storm UI 显示零喷口发射。

怀疑也许我不应该在喷口函数中绑定侦听器?

此函数是否会被多次调用?(看起来...见 https://github.com/RallySoftware/node-storm/blob/master/lib/spout.js#L4)

sync实际上做了什么,我应该什么时候使用它?

var storm = require('node-storm');
var wsEmitter = require('./wsEmitter.js')();
wsEmitter.init();  // subscribe to websocket
var futuresSpout = storm.spout(function(sync) {
  var self = this;
  console.log('subscribing to ws');
  wsEmitter.on('future', function(data){       // websocket data arrived
    self.emit([data]);
    sync();
  });
})
.declareOutputFields(["a"]);

原来我有两个问题。首先,我的拓扑没有执行,因为我的一个螺栓(未显示)无法设置.declareOutputFields()

其次,我需要延迟从喷口发出的信号,直到主管要求发出一次nextTick()。我通过缓冲任何传入的消息来做到这一点,直到主管调用喷口:

module.exports = (function(){
  var storm = require('node-storm');
  var wsEmitter = require('./wsEmitter.js')();
  wsEmitter.init();
  var queue = [];
  var queueEmpty = true;
  wsEmitter.on('thing', function(data){
    var trade = JSON.parse(data);
    trade.timeReported = new Date().valueOf();
    queue.push(trade);
    queueEmpty = false;
  });
  return storm.spout(function(sync) {
    var self = this;
    setTimeout(function(){
      if(!queueEmpty){
        self.emit([queue.shift()]);
        queueEmpty =
        ( queue.length === 0
        ? true
        : false )
      }
      sync();
    }, 100);
  })
  .declareOutputFields(['trade'])
})()

最新更新