如何知道流媒体查询用于KAFKA数据源的Kafka消费者组的名称



我正在通过Spark结构化流从Kafka主题中消费数据,该主题有3个分区。由于Spark结构化流不允许您明确提供组。ID并将某些随机ID分配给消费者,因此我尝试使用以下KAFKA命令检查消费者组ID

./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list
output
 spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
 spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
 spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0

以下是我的问题

1)为什么会创建3个消费者组?是因为三个分区吗?

2)有什么办法可以在Spark应用程序中获得这些消费者组名称?

3)即使我的Spark应用程序仍在运行,一段时间后,这些组名称并未显示在消费者组列表中。这是因为所有数据都是由Spark应用程序消耗的,并且该Kafka主题中没有更多数据?

4)如果我的假设在第3点上是正确的,则如果新数据到达或消费者组的名称将保持不变,它会创建一个新的消费者组ID吗?

下面是我的读取流

  val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topic)
 // .option("assign"," {""+topic+"":[0]}") 
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 60000)
  .load()

我在应用程序中有3个Writestreams,如下

  val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)") 
  val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
//First stream
 val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
   df1.agg(min("offset"), max("offset"))
  .writeStream
  .foreach(writer)
  .outputMode("complete")
  .option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct
//Second stream
val checkpoint_loc2=  "/warehouse/test_duplicate/download/chk2" 
distDates.writeStream.foreach(writer1)
  .option("checkpointLocation", checkpoint_loc2).start() 
//Third stream
val kafkaOutput =result.writeStream
  .outputMode("append")
  .format("orc")
  .option("path",data_dir)
  .option("checkpointLocation", checkpoint_loc3)
  .start()

流中查询仅在代码中使用一次,并且没有加入。

执行计划

== Parsed Logical Plan ==
 StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

1)为什么会创建3个消费者组?是因为三个分区吗?

当然不是。这只是一个巧合。您似乎已经运行了3次应用程序,并且主题有3个分区。

让我们开始备份。

我删除了所有消费者群体,以确保我们重新开始。

$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
// nothing got printed out

我创建了一个带有5个分区的主题。

$ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
Created topic "jacek-five-partitions".
$ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
Topic:jacek-five-partitions PartitionCount:5    ReplicationFactor:1 Configs:
    Topic: jacek-five-partitions    Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 4    Leader: 0   Replicas: 0 Isr: 0

我使用的代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object SparkApp extends App {
  val spark = SparkSession.builder.master("local[*]").getOrCreate()
  import spark.implicits._
  val q = spark
    .readStream
    .format("kafka")
    .option("startingoffsets", "latest")
    .option("subscribe", "jacek-five-partitions")
    .option("kafka.bootstrap.servers", ":9092")
    .load
    .select($"value" cast "string")
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start
  q.awaitTermination()
}

当我运行上面的火花结构化流应用程序时,我只创建了一个消费者组。

$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0

这是有道理的,因为所有火花加工都应该使用与分区一样多的Kafka消费者,但是无论消费者的数量如何重复)。


2)有什么办法可以在Spark应用程序中获得这些消费者组名称?

没有公共API,所以答案是否。

但是,您可以" hack"火花,然后在使用此行的内部Kafka消费者下方沿公共API下方:

val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

或什至这线是确切的:

val kafkaOffsetReader = new KafkaOffsetReader(
  strategy(caseInsensitiveParams),
  kafkaParamsForDriver(specifiedKafkaParams),
  parameters,
  driverGroupIdPrefix = s"$uniqueGroupId-driver")

只需找到KAFKA数据源的KafkaMicroBatchReader,就请其要求了解groupIdKafkaOffsetReader。这似乎可行。


即使我的Spark应用程序仍在运行,一段时间后,这些组名称并未显示在消费者组列表中。这是因为所有数据都是由Spark应用程序消耗的,并且该Kafka主题中没有更多数据?

可能与KIP-211有关:消费者群体偏移的到期语义的修订,该语义说:

当与该分区相关联的到期时间戳时,消费者组中主题分区的偏移将到期。此到期时间戳通常会受经纪人配置偏移的影响。retention.minutes,除非用户覆盖默认值并使用自定义保留。


4)如果到达新数据,它将创建新的消费者组ID,还是消费者组的名称保持不变?

将保持不变。

此外,当该组至少一个消费者处于活动状态时,不得删除消费者组。

group.id:kafka源将为每个查询创建一个唯一的组ID。http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

最新更新