Spark-Kafka-Streaming:偏移管理 - 无法让手动提交工作 (Java)



我们正在使用a javainputdstream< computerRecord< string,string>> 从apache kafka中读取消息(值:json-string)并写信给Elasticsearch。

我们按照Spark流中所述实现了偏移管理-KAFKA集成指南,但现在我们刚刚意识到偏移管理对我们不起作用,并且如果当前当前存在失败,则该流也不会再次读取消息迷你批次。即使我们跳过此行,它也不会再次读取消息:

((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

我们将代码分解为以下内容,并期望该流一遍又一遍地读取相同的消息,但事实并非如此:

stream.foreachRDD(recordRDD -> {
   final OffsetRange[] offsetRanges = ((HasOffsetRanges) recordRDD.rdd()).offsetRanges();
   if (!recordRDD.isEmpty()) {
      LOGGER.info("Processing some Data: " + recordRDD.rdd().count());
   }
});

消费者config-param enable.auto.commit 设置为false,在初始化Javainputdstream之后,该日志也显示在日志中。我们在测试中嵌入的Kafka经纪人和Dev Stage的Kafka-Server面临着同样的问题。两者目前都以独立模式运行。

我们尝试的是:

  • 经纪人配置:增加offsets.commit.timeout.ms
  • 消费者/流配置:将隔离设置为" read_comment"
  • 消费者/流配置:将auto.offset.reset设置为最早
  • spark:设置spark.streaming。
  • spark:增加spark.streaming.kafka.maxretries的值
  • 流:调整流层化的时间比迷你批次
  • 流:启用检查点
  • 流:更改的位置策略

这一切都没有用,似乎我们搜索了整个Web,找到了问题的原因。似乎流忽略了enable.auto.commit配置,并且在阅读了当前RDD的消息后才提交。无论我们尝试什么,我们的流都只能完全阅读一次。

我缺少任何不同的方法或事实吗?

在进行了更多测试之后,我们发现手动提交只有在实际批处理期间停止/崩溃的情况下才能正常工作。如果流停止并再次开始启动,它将再次消耗失败的数据。

因此,每当我们检测到故障javaStreamingContext.stop(false)时,我们目前正在做的事情直接停止流。在此之后,调度程序再次启动流,该流程验证了该流在监管时期内还活着,如果不是。

这不是一个优雅的解决方案,但首先对我们有用。

最新更新