使用火花流时找不到卡夫卡的组信息



我有以下简单的火花流进度,它使用来自 kafka 主题的消息test组 IDfeature1并打印结果。但是,当我运行bin/kafka-consumer-groups.sh --bootstrap-server zookeeper-1:9092 --list列出所有组时,没有feature1或任何包含feature1的内容。怎么了?

我的火花版是2.1.2,卡夫卡版是2.12-2.0.0,动物园管理员版是3.4.13。我在这里发现了一些与之相关的问题 https://github.com/yahoo/kafka-manager/issues/207,但我不知道天气在我的情况下的问题与问题有关。

# coding=utf8
import sys
import datetime
import time
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
spark_conf = SparkConf()
spark_conf.set('spark.streaming.kafka.maxRatePerPartition', 1)
sc = SparkContext("local[2]", "NetworkWordCount", conf=spark_conf)
ssc = StreamingContext(sc, 10)
# Create a DStream that will connect to hostname:port, like localhost:9999
kafka_params = {
"bootstrap.servers":"zookeeper-1:9092",
"group.id":"feature1",
"auto.offset.reset":"smallest",
"session.timeout.ms":"60000",
"request.timeout.ms":"100000",
}
lines = KafkaUtils.createDirectStream(ssc, ["test"], kafka_params)
# lines = KafkaUtils.createStream(ssc, 'zookeeper-1:2181', 'feature1', {'new-one':1})
lines.pprint()
ssc.start()
ssc.awaitTermination()

组列表的输出如下,sudo 不会更改任何内容。

console-consumer-9215
console-consumer-41888
console-consumer-32417
console-consumer-35073
console-consumer-66656

还有一个奇怪的现象,feature1出现在动物园管理员/consumers目录中,而控制台-消费者-*组则没有。

下面的代码片段是 kafka 脚本在后端运行以获取消费者组的内容。试试这个来打印消费者组,看看你的组是否正在打印(注意"过滤器"(,它在 Scala 中。

卡夫卡版本:1.0.0 , 斯卡拉版本: 2.12.0

import kafka.admin.AdminClient
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers","<kafka-bootstrap>:9092")
AdminClient.create(props).listAllConsumerGroupsFlattened().map(_.groupId).filter(_.contains("mx-")).mkString(";").split(";").foreach(println(_))
}

最新更新