Spark Streaming:如何向我的 DStream 添加更多分区



我有一个火花流应用程序,看起来像这样:

val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)
    kafkaDF.foreachPartition(
      i =>{
        createConnection()
        i.foreach(
          row =>{
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )

而且,我在纱线簇上运行它

spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....

当我尝试记录kafkaDF.rdd.partitions.size时,结果大多是"1"或"5"。我很困惑,是否可以控制数据帧的分区数? KafkaUtils.createStream似乎不接受与我想要的 rdd 分区数相关的任何参数。我试过kafkaDF.rdd.repartition( int ),但它似乎也不起作用。

如何在代码中实现更高的并行性?如果我的方法错了,正确的方法是什么?

在 Spark Streaming 中,并行性可以在两个方面实现:(a( 使用者/接收者(在您的例子中是 Kafka 使用者(和 (b( 处理(由 Spark 完成(。

默认情况下,Spark 流式处理将为每个使用者分配一个核心(也称为线程(。因此,如果您需要摄取更多数据,则需要创建更多使用者。每个使用者将创建一个DStream。然后,您可以将 DStreams 联合起来以获得一个大流。

// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B
val combineStream = messageStream1.union(messageStream2)

或者,可以通过对输入流进行重新分区来增加接收器/使用者的数量:

inputStream.repartition(<number of partitions>))

流式处理应用可用的所有剩余内核都将分配给 Spark。

因此,如果您有N个内核(通过spark.cores.max定义(,并且您有C使用者,那么您就剩下N-C可用于Spark的核心。

#Partitions =~  #Consumers x (batch duration / block interval)

块间隔 = 使用者在将其创建的数据推送为 Spark 块(定义为配置 spark.streaming.blockInterval (之前等待的时间。

永远记住,Spark Streaming有两个不断发生的功能。一组读取当前微批处理的线程(使用者(和一组处理上一个微批处理的线程 (Spark(。

有关更多性能调整提示,请参阅此处、此处和此处。

相关内容

  • 没有找到相关文章