我实现了Spark结构化流,对于我的用例,我必须指定起始偏移量。
而且,我有Array[String]
形式的偏移值:
{"topic":"test","partition":0,"starting_offset":123}
{"topic":"test","partition":1,"starting_offset":456}
我想以编程方式将其转换为以下内容,以便我可以将其传递给 Spark。
{"test":{"0":123,"1":456}}
注意:这只是一个示例,我不断获得不同的偏移范围,因此无法对其进行硬编码。
如果array
是包含您描述的列表的变量,那么:
>>> [{d['topic']: [d['partition'], d['starting_offset']]} for d in array]
[{'test': [0, 123]}, {'test': [1, 456]}]
scala> import org.json4s._
scala> import org.json4s.jackson.JsonMethods._
scala> val topicAsRawStr: Array[String] = Array(
"""{"topic":"test","partition":0,"starting_offset":123}""",
"""{"topic":"test","partition":1,"starting_offset":456}""")
scala> val topicAsJSONs = topicAsRawStr.map(rawText => {
val json = parse(rawText)
val topicName = json "topic" // Extract topic value
val offsetForTopic = json "starting_offset" // Extract starting_offset
topicName -> offsetForTopic
})
scala> // Aggregate offsets for each topic
您还可以使用 spark.sparkContext.parallelize API。
scala> case class KafkaTopic(topicName: String, partitionId: Int, starting_offset: Int)
scala> val spark: SparkSession = ???
scala> val topicAsRawStr: Array[String] = Array(
"""{"topic":"test","partition":0,"starting_offset":123}""",
"""{"topic":"test","partition":1,"starting_offset":456}""")
scala> val topicAsJSONs = topicAsRawStr.map(line => json.parse(line).extract[KafkaTopic])
scala> val kafkaTopicDS = spark.sparkContext.parallelize(topicAsJSONs)
scala> val aggregatedOffsetsByTopic = kafkaTopicDS
.groupByKey("topic")
.mapGroups {
case (topicName, kafkaTopics) =>
val offsets = kafkaTopics.flatMap(kT => kT.starting_offset)
(topicName -> offsets.toSet)
}