我有一个火花流应用程序,看起来像这样:
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
kafkaDF.foreachPartition(
i =>{
createConnection()
i.foreach(
row =>{
connection.sendToTable()
}
)
closeConnection()
}
)
而且,我在纱线簇上运行它
spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....
当我尝试记录kafkaDF.rdd.partitions.size
时,结果大多是"1"或"5"。我很困惑,是否可以控制数据帧的分区数? KafkaUtils.createStream
似乎不接受与我想要的 rdd 分区数相关的任何参数。我试过kafkaDF.rdd.repartition( int )
,但它似乎也不起作用。
如何在代码中实现更高的并行性?如果我的方法错了,正确的方法是什么?
在 Spark Streaming 中,并行性可以在两个方面实现:(a( 使用者/接收者(在您的例子中是 Kafka 使用者(和 (b( 处理(由 Spark 完成(。
默认情况下,Spark 流式处理将为每个使用者分配一个核心(也称为线程(。因此,如果您需要摄取更多数据,则需要创建更多使用者。每个使用者将创建一个DStream。然后,您可以将 DStreams 联合起来以获得一个大流。
// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B
val combineStream = messageStream1.union(messageStream2)
或者,可以通过对输入流进行重新分区来增加接收器/使用者的数量:
inputStream.repartition(<number of partitions>))
流式处理应用可用的所有剩余内核都将分配给 Spark。
因此,如果您有N
个内核(通过spark.cores.max
定义(,并且您有C
使用者,那么您就剩下N-C
可用于Spark的核心。
#Partitions =~ #Consumers x (batch duration / block interval)
块间隔 = 使用者在将其创建的数据推送为 Spark 块(定义为配置 spark.streaming.blockInterval
(之前等待的时间。
永远记住,Spark Streaming有两个不断发生的功能。一组读取当前微批处理的线程(使用者(和一组处理上一个微批处理的线程 (Spark(。
有关更多性能调整提示,请参阅此处、此处和此处。