在使用group-offsets
作为扫描启动模式时,我无法将auto.offset.reset
设置为最新。我尝试过属性。*如文档中所述-https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-读取位置,但仍然使用auto.offset.reset
=none创建Kafka消费者组(在Flink日志中验证(,作业失败,错误为:Undefined offset with no reset policy for partitions: ....
CREATE TABLE test (id int, order_time timestamp(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND )
WITH (
'connector' = 'kafka',
'topic' = 'test_topic',
'properties.group.id' = 'testGroup',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'latest',
'avro-confluent.url' = 'http://localhost:8081'
)
Flink版本:1.14.0。
StackTrace:
Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test_topic-11]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:683)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2420)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1750)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1709)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:375)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:260)
at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more
我是不是错过了什么?
您在定义中定义了
properties.auto.offset.reset
,但文档中不存在此类属性。如果你去掉它会发生什么
除此之外,我想知道你是否也患有https://issues.apache.org/jira/browse/FLINK-24697
编辑:我看错了,properties
密钥被转发了。但我想知道,如果您将该值设置为latest
,同时为scan.startup.mode
指定group-offsets
,您会得到什么结果。