Apache Flink KafkaSource doesnt set group.id



我有一个简单的流执行配置为:

val config: Configuration = new Configuration()
config.setString("taskmanager.memory.managed.size", "4g")
config.setString("parallelism.default", "4")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
env
.fromSource(KafkaSource.builder[String]
.setBootstrapServers("node1:9093,node2:9093,node3:9093")
.setTopics("example-topic")
//.setProperties(kafkaProps) // didn't work
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "GSSAPI")
.setProperty("sasl.kerberos.service.name", "kafka")
.setProperty("group.id","groupid-test")
//.setGroupId("groupid-test") // didn't work
.setStartingOffsets(OffsetsInitializer.earliest)
.setProperty("partition.discovery.interval.ms", "60000") // discover part
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(classOf[StringDeserializer]))
.build,
WatermarkStrategy.noWatermarks[String],
"example-input-topic"
)
.print
env.execute("asdasd")

我的flink版本是:1.14.2

我的卡夫卡在cloudera上跑步。Kafka版本:2.2.1-cdh6.3.2

我可以使用卡夫卡的记录。但它并没有为主题设置groupid。有人有什么想法吗?

由于Flink 1.14.0,group.id是一个可选值。看见https://issues.apache.org/jira/browse/FLINK-24051.如果要指定一个值,可以设置自己的值。你可以从随附的PR中看到,这是如何在之前设置的https://github.com/apache/flink/pull/17052/files#diff-34b4f8d43271eeac91ba17f29b1332f6e0ff3d15f71003a839eb780fe30fbL56

最新更新