与自定义Spark结构化流接收器没有并行性



我正在编写一个自定义Spark结构化流接收器,用于将从Kafka读取的事件写入Google BQ(大查询)。下面是我写的代码:

代码正在编译并成功运行。但是My Sink总是只在一个执行器中运行(总是在驱动程序运行的地方)。我不明白这里的问题。

这是我自定义的大查询接收器的实现。

package bq
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode

class DefaultSource extends StreamSinkProvider with DataSourceRegister{
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new BQSink(sqlContext, parameters, partitionColumns, outputMode)
}
override def shortName(): String = "bq"
}
class BQSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink {

override def addBatch(batchId: Long, data: DataFrame): Unit = {
val df = data.sparkSession.createDataFrame
(data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)

df.collect().foreach({ row => {
//code that writes the rows to Big Query.
}  
}

这是我的驱动程序

// Reading raw events from Kafka
val inputDF = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.getString("kafkaBrokers"))
.option("subscribe", "topic")
.option("fetchOffset.numRetries", 5)
.option("failOnDataLoss", "false")
.option("startingOffsets", "latest")
.load()
.selectExpr("value")
.as[Array[Byte]];
// Transforming inputDF to OutputDF
val outputDF = inputDF.map(event => transform(event))

// Writing outputDF events to BQ
val query = outputDF.writeStream
.format("bq")
.option("checkpointLocation",config.getString("checkpointLocation"))
.outputMode(OutputMode.Append())
.start()
//Start Streaming
query.awaitTermination()

即使我的主题有多个分区,我的自定义接收器只在单个执行器中运行

使用df.collect将从执行程序收集所有数据到您的驱动程序。因此,您只看到驱动程序向您的接收器发送数据。

您需要执行df.foreachPartition并使用可在执行器上访问的BQ生产者。您可能会遇到"任务不可序列化"。问题,但你可以看看这里,了解如何在Scala Spark中解决这个问题。

最新更新