如何在转换不是1:1而是1:many时创建Spark DataSet



我正在编写一个结构化的流式Spark应用程序,从Kafka队列中读取并处理收到的消息。我想要的最终结果是一个DataSet[MyMessage](其中MyMessage是一个自定义对象(,我想将它排队到另一个Kafka主题。问题是,来自消费者Kafka队列的每个输入消息都可以产生多个MyMessage对象,因此转换不是1:1,1:Many。

所以我在做

val messagesDataSet: DataSet[List[MyMessage]] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.option("failOnDataLoss", false)
.option("startingOffsets", "offset1")
.load()
.select($"value")
.mapPartitions{r => createMessages(r)}
val createMessages(row: Iterator[Row]): List[MyMessage] = {
// ...
}

显然,messagesDataSetDataSet[List[MyMessage]]。有没有办法让我只得到一个DataSet[MyMessage]

或者有没有一种方法可以取一个DataSet[List[MyMessage]],然后将每个MyMessage对象写入另一个Kafka主题?(这毕竟是我的最终目标(

尝试

messagesDataSet.flatMap(identity)

您可以使用mapPartitions创建多个值(因此它的工作方式类似于flatMap(,但您必须返回Iterator:

def createMessages(row: Iterator[Row]): Iterator[MyMessage] = {
row.map(/*...*/) //you need too return iterator here
}

最新更新