带有kafka源、kafka汇和内存通道的Apache水槽-抛出UNKNOWN_TOPIC_OR_PARTITION



我是Apache水槽的新手https://flume.apache.org/.对于其中一个用例,我需要将数据从一个集群上的Kafka主题(bootstrap:bootstrap1,topic1(移动到另一个集群中具有不同名称的主题(boottrap:bootstrap2,topic2(。在同一个项目中还有另一个最适合水槽的用例,我需要在这个用例中使用相同的水槽管道,尽管还有其他选项可以从Kafka复制到Kafka。

我尝试了以下配置,结果如每个选项中所述。

#1:telnet到kafka接收器(引导程序2,主题2(-->效果完美。配置:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic2
a1.sinks.k1.kafka.bootstrap.servers = bootstrap2
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#2:kafka作为源(bootstrap1,topic1(,logger作为宿-->效果完美。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 10
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bootstrap1
a1.sources.r1.kafka.topics = topic1
a1.sources.r1.kafka.consumer.group.id = flume-gis-consumer
a1.sources.r1.backoffSleepIncrement = 1000
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#3:kafka作为源(bootstrap1,topic1(和kafka为汇(bootstrap2,topic2(-->给出如下配置所述的错误。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 10
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bootstrap1
a1.sources.r1.kafka.topics = topic1
a1.sources.r1.kafka.consumer.group.id = flume-gis-consumer1
a1.sources.r1.backoffSleepIncrement = 1000

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic2
a1.sinks.k1.kafka.bootstrap.servers = bootstrap2
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

错误:

(kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:968)] [Producer clientId=producer-1] Error while fetching metadata with correlation id 85 : {topic1=UNKNOWN_TOPIC_OR_PARTITION}

连续显示上述错误。

终止水槽ng命令时出错

(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:268)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.EventDeliveryException: Could not send event
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:234)
... 3 more

向stackoverflow社区寻求帮助:

  1. 这里的配置出了问题。卡夫卡主题存在于各自的集群中。(选项1和选项2运行良好,我可以看到消息从源流到汇点(
  2. 为什么制作人线程试图在源代码中制作卡夫卡主题

我今天遇到了同样的问题。我的情况更糟,因为我在一个Kafka集群上托管了两个主题。

卡夫卡水槽中的生产者线程正在制作回卡夫卡源主题,这确实是一种误导。

我通过为Kafka接收器设置allowTopicOverridefalse来修复该问题。

引用Flume文档中Kafkasink部分:

allowTopicOverride:默认值为true。设置后,接收器将允许在topicHeader属性(如果提供(指定的主题中生成消息。

topicHeader:当与allowTopicOverride一起设置时,将在使用此属性的值命名的标头的值中生成一条消息与Kafka Source topicHeader属性一起使用时应小心,避免创建环回

在Kafka部分:

setTopicHeader:默认值为true。设置为true时,将检索到的消息的主题存储到由topicHeader属性定义的标头中。

因此,默认情况下,Apache Flume将每个事件的Kafka源主题存储在topicHeader中。然后,Kafka sink默认写入topicHeader中指定的主题。