火花结构化的流执行者怪异的行为



使用火花结构化流和cloudera解决方案我正在使用3个执行者,但是当我启动应用程序时,使用的执行程序仅是一个。我如何使用多个执行者?

让我给你更多的信息。这是我的参数:

命令启动:

spark2-submit --master yarn 
--deploy-mode cluster 
--conf spark.ui.port=4042 
--conf spark.eventLog.enabled=false 
--conf spark.dynamicAllocation.enabled=false 
--conf spark.streaming.backpressure.enabled=true 
--conf spark.streaming.kafka.consumer.poll.ms=512 
--num-executors 3 
--executor-cores 3 
--executor-memory 2g 
--jars /data/test/spark-avro_2.11-3.2.0.jar,/data/test/spark-streaming-kafka-0-10_2.11-2.1.0.cloudera1.jar,/data/test/spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar 
--class com.test.Hello /data/test/Hello.jar

代码:

val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", <topic_list:9092>)
      .option("subscribe", <topic_name>)
      .option("group.id", <consumer_group_id>)
      .load()
      .select($"value".as[Array[Byte]], $"timestamp")
      .map((c) => { .... })
val query = lines
      .writeStream
      .format("csv")
      .option("path", <outputPath>)
      .option("checkpointLocation", <checkpointLocationPath>)
      .start()
query.awaitTermination()

在SparkUI中结果:Sparkui图像

我期望所有执行者都在工作。

有什么建议?

谢谢Paolo

看起来您的配置没有错,只是您使用的分区可能只是一个。您需要增加Kafka生产商中的分区。通常,分区大约是执行人人数的3-4倍。

如果您不想触摸生产者代码,则可以在应用地图方法之前进行repartition(3(,因此每个执行程序都可以在其自己的逻辑分区上工作。

如果您仍然希望明确提及每个执行人所做的工作,则可以做mapperpartion方法。

相关内容

  • 没有找到相关文章

最新更新