Flink - InstanceAlreadyExistsException:当迁移到KafkaSource时



我使用flink与v1.13.2。我正在尝试将flinkkafkconsumerer迁移到KafkaSource。当我正在测试新的KafkaSource时,我得到了以下异常:

2022-04-27 12:49:13,206 WARN  org.apache.kafka.common.utils.AppInfoParser                  [] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=my-kafka-id-7
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) ~[?:?]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:?]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:90) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$0(KafkaSource.java:145) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:136) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:61) 
...

下面是flinkkafkconsumerer和kafkconsumerer的配置:


import java.util.Properties;
public class KafkaConsumer {
private KafkaConsumer() {
}
public static Properties getKafkaProp(String kafkaTopicName){
Properties properties = new Properties();
String kafkaBrokerServers;
properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "...");
switch (kafkaTopicName)
{
case ...:
kafkaBrokerServers = "1.2.3.4";
break;
case ...:
kafkaBrokerServers = "3.4.5.6";
break;
default:
kafkaBrokerServers = "6.7.7.9";
break;
}
properties.setProperty("bootstrap.servers", kafkaBrokerServers);
String kafkaGroupId = "my-kafka-id"
properties.setProperty("group.id", kafkaGroupId);
properties.setProperty("partition.discovery.interval.ms", "10000");
return properties;
}

public static<T>  FlinkKafkaConsumer<T> getKafkaConsumerForFlink(String kafkaTopicName, DeserializationSchema<T> deserializationSchema, Properties properties) {
FlinkKafkaConsumer<T> consumer = new FlinkKafkaConsumer<>(
kafkaTopicName,
deserializationSchema,
properties);
consumer.setStartFromLatest();
return consumer;
}
public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicNames, DeserializationSchema<T> deserializationSchema, Properties properties) {
return KafkaSource.<T>builder()
.setTopics(kafkaTopicNames)
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(deserializationSchema)
.build();
}
}

public class KafkaStream{
public DataStream<KafkaObject> getKafkaStream_1(ExecutionParameters executionParameters) {
KafkaSource<KafkaObject> consumerBinden = KafkaConsumer.getKafkaSourceForFlink("topic-1", new KafkaEntitySerialization<>(KafkaObject.class), getKafkaProp("topic-1"));
return executionParameters.getFlinkExecutionEnvironment().fromSource(consumerBinden, WatermarkStrategy.noWatermarks(), "topic-1").setParallelism(15).uid({RandomGeneratedString}).disableChaining();
}
public DataStream<KafkaObject> getKafkaStream_2(ExecutionParameters executionParameters) {
KafkaSource<KafkaObject> kafka = KafkaConsumer.getKafkaSourceForFlink("topic-2", new KafkaEntitySerialization<>(KafkaObject.class), getKafkaProp("topic-2"));
return executionParameters.getFlinkExecutionEnvironment().fromSource(consumerRal, WatermarkStrategy.noWatermarks(), "topic-2" ).setParallelism(15).uid({RandomGeneratedString}).disableChaining();
}
}

我还用下面的代码片段创建了KafkaSource,但它不起作用:

public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicNames, DeserializationSchema<T> deserializationSchema, Properties properties) {
return KafkaSource.<T>builder()
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(kafkaTopicNames)
.setGroupId(properties.getProperty("group.id"))
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(deserializationSchema)
.build();
}

可能是什么问题,我该如何解决它?


更新问题是client_id。我用的是同一个客户。Id为不同的主题。如果有人遇到相同的警告,请尝试设置setClientIdPrefix:

public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicName, DeserializationSchema<T> deserializationSchema, Properties properties)
{
return KafkaSource.<T>builder()
.setTopics(kafkaTopicName)
.setProperties(properties)
.setClientIdPrefix(UUID.randomUUID().toString())
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(deserializationSchema)
.build();
}
}

您列出的错误是一个警告,而不是一个异常。

基于来自kafka消费者的InstanceAlreadyExistsException,我怀疑你正在使用相同的client.id。该线程中的建议是将其更改为唯一名称。

最新更新