Kafka010JsonTableSource 在禁用检查点时不会自动提交 kafka 偏移量



我已经建立了一个本地的Kafka0.10+Flink1.4环境。

我使用以下代码来消费者来自 Kafka 主题的数据:

val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
.forTopic(kafkaConfig.topic)
.withKafkaProperties(props)
.withSchema(dynamicJsonSchema)
.withRowtimeAttribute(enventTimeFieldName,new ExistingField(enventTimeFieldName), new BoundedOutOfOrderTimestamps(30000L))
.build() 
tableEnv.registerTableSource(tableName, tableSource)
val tableResult:Table = tableEnv.sqlQuery(sql)

执行此代码后,总是发现警告消息:

自动提交偏移量 {taxiData-0=OffsetAndMetadata{offset=728461, metadata=''}} 组 taxiDataGroup 失败: 无法完成提交,因为组已重新平衡并将分区分配给另一个成员。 这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长, 这通常意味着轮询循环花费了太多时间处理消息。 您可以通过增加会话超时或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。

无论我在 Kafka 中设置了下面这样的属性,它总是显示上面的警告消息。

{
"propertyKey": "enable.auto.commit",
"propertyValue": "true"
},
{
"propertyKey": "session.timeout.ms",
"propertyValue": "250000"
},
{
"propertyKey": "request.timeout.ms",
"propertyValue": "305000"
},
{
"propertyKey": "auto.commit.interval.ms",
"propertyValue": "800000"
},
{
"propertyKey": "max.poll.records",
"propertyValue": "300"
},
{
"propertyKey": "max.poll.interval.ms",
"propertyValue": "300000"
}

我不确定 Kafka010JsonTableSource Flink1.4 是否会自动提交偏移量。但测试结果表明它不会自动提交偏移量。谁能帮忙确认这个问题?或者你能在我的代码中看到任何其他问题吗?

您是否尝试过设置低于经纪商group.max.session.timeout.ms值的session.timeout.ms值?根据 https://github.com/dpkp/kafka-python/issues/746 的说法,这似乎是问题所在。

相关内容

  • 没有找到相关文章

最新更新