Flink Kafka SQL set 'auto.offset.reset'



在使用group-offsets作为扫描启动模式时,我无法将auto.offset.reset设置为最新。我尝试过属性。*如文档中所述-https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-读取位置,但仍然使用auto.offset.reset=none创建Kafka消费者组(在Flink日志中验证(,作业失败,错误为:Undefined offset with no reset policy for partitions: ....

CREATE TABLE test (id int, order_time timestamp(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND )
WITH (
'connector' = 'kafka',
'topic' = 'test_topic',
'properties.group.id' = 'testGroup',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'latest',
'avro-confluent.url' = 'http://localhost:8081'
)

Flink版本:1.14.0。

StackTrace:

Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test_topic-11]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:683)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2420)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1750)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1709)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:375)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:260)
at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more

我是不是错过了什么?

您在定义中定义了properties.auto.offset.reset,但文档中不存在此类属性。如果你去掉它会发生什么

除此之外,我想知道你是否也患有https://issues.apache.org/jira/browse/FLINK-24697

编辑:我看错了,properties密钥被转发了。但我想知道,如果您将该值设置为latest,同时为scan.startup.mode指定group-offsets,您会得到什么结果。

相关内容

  • 没有找到相关文章

最新更新