有必要在暴风雨中确认一个元组吗



这似乎令人困惑,在我看到的一些例子中,元组上的ack在每个螺栓中都被调用,而在一些地方则不是这样。这方面的做法是什么?它的含义是什么?

在互联网上搜索了这个答案后,我从文档中找到了这个链接,在这方面真的很有帮助。

喷口如何处理消息:

当一个spot从源(比如Kafka或Kestrel队列)获取消息时,它会opens该消息。这意味着消息实际上还没有从队列中取出,而是被放置在"队列"中;挂起";等待消息完成确认的状态。处于挂起状态时,消息将不会发送给队列的其他使用者。此外,如果客户端断开连接,则该客户端的所有挂起消息都会放回队列中。

打开消息时,Kestrel向客户端提供消息的数据以及消息的唯一id。当向SpoutOutputCollector发送元组时,KestrelSpout使用该确切的id作为元组的message id。稍后,当在KestrelSpout上调用ackfail时,KestrelSpoutKestrel发送ackfail消息,message id将消息从队列中删除或重新打开。

何时需要Ack:

每当客户端在元组树中创建新链接(也称为锚定)时,都需要告诉Storm,这是通过发出新元组来完成的。

客户端还需要告诉Storm您何时完成了对单个元组的处理,这是由ack完成的。通过同时做这两件事,Storm可以检测元组树何时被完全处理,并可以适当地确认或失败喷口元组。

在下面的示例中,bolt将包含一个句子的元组拆分为每个单词的元组。通过将输入元组指定为要发出的第一个参数来锚定每个单词元组。由于单词元组是锚定的,如果单词元组未能在下游处理,则稍后将重播树根处的喷口元组。

public class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}        
}

相反,如果元组是这样发出的:

_collector.emit(new Values(word));

以这种方式发出单词tuple会导致它被取消固定。如果在下游处理元组失败,则不会重播根元组。根据拓扑中所需的容错保证,有时发出未绑定的元组是合适的。

何时不需要Ack:

在许多情况下,bolt遵循一种常见的模式,即读取输入元组,基于它发出元组,然后在execute方法结束时对元组进行acking。这些螺栓分为过滤器和简单功能两类。Storm有一个名为BasicBolt的接口,它为您封装了此模式。

以下是SplitSentence的示例,它可以写成BasicBolt,如下所示:

public class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}        
}

此实现比以前的实现更简单,并且在语义上完全相同。发送到BasicOutputCollector的元组会自动锚定到输入元组,并且在execute方法完成时会自动为您确认输入元组。

编辑

如前所述,从这里可以看出,IBasicBolt为您负责确认,因此无论什么类实现IBasicBolt:

/**
* Process the input tuple and optionally emit new tuples based on the input tuple.
* 
* All acking is managed for you. Throw a FailedException if you want to fail the tuple.
*/

BaseBasicBolt和BaseRichBolt都实现IBasicBolt

最新更新