如何在Storm上制作一个同步的KafkaSpout



我正试图让一个Kafka消费者同步消费来自Kafka的消息。

我遇到的实际问题是消息队列存储在Storm Spout中。

我想做的是让Storm等待Kafka ACK的回复,然后让Storm消耗下一条消息。

我正在使用Storm KafkaSpout:

/**
* Creates a configured kafka spout.
* @param topicName Topic where the kafka spout subscribes
* @return An instance of configured KafkaSpout
*/
public KafkaSpout getkafkaSpout(String topicName){
return new KafkaSpout(this.getSpoutConfig(topicName));
}
/**
* Create the necessary configuration to create a new kafka spout.
* @param topicName Topic where the kafka spout subscribes
* @return Spout configuration
*/
public SpoutConfig getSpoutConfig(String topicName) {
SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(),topicName, "", String.join("-",topicName,RandomStringUtils.randomAlphanumeric(20)));
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
return spoutConfig;
}

builder.setSpout("kafkaPackedData", stormConfig.getkafkaSpout("topic_kafka"), 2);

我有Storm 2.0.0的更新,我使用Storm kafka客户端。但是如果我配置Storm队列到50:setMaxSpoutPending(50);当我向Kafka发送许多数据时,它Storm停止消耗它。

我已经用下一个配置配置了Kafka消费者:

KafkaSpoutConfig spoutConf =  KafkaSpoutConfig.builder("stream1:9092", "kafkaToStormAlarms")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000") //Set session timeout
.setProp(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000") //Set request timeout
.setOffsetCommitPeriodMs(10000)    //Set automatic confirmation time (in ms)
.setFirstPollOffsetStrategy(LATEST)    //Set to pull the latest messages
.setRetry(kafkaSpoutRetryService)
.build();

当Storm消耗与MaxSpoutPending配置相同的50条消息时,它将停止消耗更多的消息。也许下一个螺栓没有正确发送ACK?我使用KafkaConsumerPout:之后的下一个螺栓

public class testBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("MQTTmessage"));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector boc) {
System.out.println("nnnnLLEGA BIENN AL SPLIT TEXT BOLTnn");
System.out.println("TUPLE "+tuple);
String text = tuple.getString(4);
List<String> lines = Arrays.asList(text.split("\r?\n"));
lines.forEach(line -> {
boc.emit(new Values(line));
});
}
}

关于节流喷口:是的,可以通过将拓扑配置中的topology.max.spout.pending选项设置为1来实现。如果您想获得良好的吞吐量,我不会真正推荐它,但我认为您已经仔细考虑过为什么需要拓扑以这种方式运行。

关于新的喷口:stream1:9092是Kafka运行的服务器吗?kafkaToStormAlarms是你发送到的主题吗?如果没有,那可能是你的问题。否则,检查storm/logs/workers-artifacts中的工作日志,它可能会告诉你为什么喷口没有发出任何东西。

最后,是的,您绝对应该使用storm-kafka-client而不是storm-kafka,否则您将无法升级到Storm 2.0.0或最新的Kafka版本。

最新更新