环境:Hadoop2.75.+FLink1.4+Kafka0.10
我设置了一个实时数据处理项目。我使用 Flink Table source API (Kafka010JsonTableSource) 作为 tablaSource。从 kafka 获取数据,然后执行 SQL 并最终输出到 kafka 主题。这是一个清晰的流程,但是当我在 Flink 集群上执行它时遇到了异常,下面是我的主要代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
env.enableCheckpointing(5000)
val tableEnv = TableEnvironment.getTableEnvironment(env)
val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
.forTopic(kafkaConfig.topic)
.withKafkaProperties(props)
.withSchema(dynamicJsonSchema)
.withRowtimeAttribute(
enventTimeFieldName,
new ExistingField(enventTimeFieldName),
new MyBoundedOutOfOrderTimestamps(100L))
.build()
tableEnv.registerTableSource(tableName, tableSource)
val tableResult:Table = tableEnv.sqlQuery(sql)
tableResult.writeToSink(new Kafka010JsonTableSink(kafkaOutput.topic, props))
我已经启用了检查点。我第一次在 flink 上执行时,我只是遵循消费者的默认配置。在 Flink 任务运行后,我用 kafka shell 命令(kafka-consumer-groups.sh)检查了偏移量,发现了一个奇怪的情况。根据 Flink 任务管理器的 shell 命令输出和日志,我发现偏移量在几秒钟开始时成功提交,但后来我继续遇到许多异常,如下所示:
块引用 2018-01-19 09:24:03,174 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - 向 Kafka 提交偏移量失败。这不会影响 Flink 的检查点。 org.apache.kafka.clients.consumer.CommitFailedException:无法完成提交,因为组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:247) 2018-01-19 09:24:03,174 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -异步 Kafka 提交失败。org.apache.kafka.clients.consumer.CommitFailedException:无法完成提交,因为组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:247)
所以我根据上述错误信息搜索解决方案。有人告诉我应该增加 session.timeout.ms,然后我遵循它,但仍然失败了。之后,我尝试使用以下多种组合配置进行测试,kafka 偏移量总是在开始时成功提交,但稍后将提交失败。我真的不知道解决它,你能帮我解决它吗?非常感谢!!!!!
kafka 使用者配置组合如下所示:
{
"propertyKey": "session.timeout.ms",
"propertyValue": "300000"
},
{
"propertyKey": "request.timeout.ms",
"propertyValue": "505000"
},
{
"propertyKey": "auto.commit.interval.ms",
"propertyValue": "10000"
},
{
"propertyKey": "max.poll.records",
"propertyValue": "50"
},
{
"propertyKey": "max.poll.interval.ms",
"propertyValue": "500000"
},
{
"propertyKey": "client.id",
"propertyValue": "taxi-client-001"
},
{
"propertyKey": "heartbeat.interval.ms",
"propertyValue": "99000"
}
我尝试将上述配置更改为各种值,但都失败了,即使我根据 kafka 官方文档指南配置它们。我希望你能帮助修复上面的错误,非常感谢!!
我得到了根本原因。重新平衡错误总是发生的原因是两个消费者(一个是消费者输入数据,另一个是消费者输出数据)的组名相同。我怀疑只有一个协调器没有足够的能力来处理两个使用者的偏移提交操作。在我更改了一个消费者的组名后,世界突然安静了下来。错误从未发生。