我正在使用火花流(直接流方法(从 Kafka 读取大约 1M 条/批记录,并对数据进行一些分析,处理它大约需要 13-15 分钟。
因此,为了稳定系统,我想更改 kafka 参数中的 kafka 属性 'max.poll.interval.ms',以允许轮询在 15 分钟后发生。
var kafkaParams = Map(
..
..
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"max.poll.interval.ms" -> (900000: java.lang.Integer)
)
但是当我检查日志时,它说:
警告消费者配置:提供了配置 max.poll.interval.ms = 900000,但不是已知配置。
这与我无法使用此属性的卡夫卡版本有关。我使用的是卡夫卡版本(0.10.1.0(。
任何帮助将不胜感激。
谢谢!
它不是使用者属性的一部分。您可以在Spark conf中尝试 spark.streaming.kafka.consumer.poll.ms。
Tl; dr.
只需在您的pom中添加一个工件.xml就可以了。
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
完整答案
参数max.poll.interval.ms
是从 Kafka 0.10.1 (KIP-62( 开始添加的。
从 Kafka 0.10.1 开始,检测信号由单独的线程发送,这大大减少了不必要的重新平衡。强烈建议从 0.10.1 开始使用 kafka 客户端。
根据 Sathvik Vutukuri 在 DZone 上发布的 Spark 3.0.0 示例,显式添加特定版本的 kafka 客户端工件(在依赖项列表中,即pom.xml
/build.sbt
( 将"强制"实例实例化正确版本的 kafka 客户端。
更多
随着时间的推移,卡夫卡不断发展。始终建议使用已部署/源/目标 kafka 群集的匹配/兼容客户端版本。
在以下位置查看完整的工件列表:
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients