我们正在使用a javainputdstream< computerRecord< string,string>> 从apache kafka中读取消息(值:json-string)并写信给Elasticsearch。
我们按照Spark流中所述实现了偏移管理-KAFKA集成指南,但现在我们刚刚意识到偏移管理对我们不起作用,并且如果当前当前存在失败,则该流也不会再次读取消息迷你批次。即使我们跳过此行,它也不会再次读取消息:
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
我们将代码分解为以下内容,并期望该流一遍又一遍地读取相同的消息,但事实并非如此:
stream.foreachRDD(recordRDD -> {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) recordRDD.rdd()).offsetRanges();
if (!recordRDD.isEmpty()) {
LOGGER.info("Processing some Data: " + recordRDD.rdd().count());
}
});
消费者config-param enable.auto.commit 设置为false,在初始化Javainputdstream之后,该日志也显示在日志中。我们在测试中嵌入的Kafka经纪人和Dev Stage的Kafka-Server面临着同样的问题。两者目前都以独立模式运行。
我们尝试的是:
- 经纪人配置:增加offsets.commit.timeout.ms
- 消费者/流配置:将隔离设置为" read_comment"
- 消费者/流配置:将auto.offset.reset设置为最早
- spark:设置spark.streaming。
- spark:增加spark.streaming.kafka.maxretries的值
- 流:调整流层化的时间比迷你批次
- 流:启用检查点
- 流:更改的位置策略
这一切都没有用,似乎我们搜索了整个Web,找到了问题的原因。似乎流忽略了enable.auto.commit配置,并且在阅读了当前RDD的消息后才提交。无论我们尝试什么,我们的流都只能完全阅读一次。
我缺少任何不同的方法或事实吗?
在进行了更多测试之后,我们发现手动提交只有在实际批处理期间停止/崩溃的情况下才能正常工作。如果流停止并再次开始启动,它将再次消耗失败的数据。
因此,每当我们检测到故障javaStreamingContext.stop(false)
时,我们目前正在做的事情直接停止流。在此之后,调度程序再次启动流,该流程验证了该流在监管时期内还活着,如果不是。
这不是一个优雅的解决方案,但首先对我们有用。