我在4个CPU内核和8个线程上使用Spark 2.4.0和Scala 2.11。
我写了以下应用程序:
package demos.spark
object WordCounter {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.master("local[4]")
.getOrCreate
import spark.implicits._
spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load
.as[String]
.flatMap(_.split("\W+"))
.groupBy("value")
.count
.writeStream
.outputMode("complete")
.format("console")
.start
.awaitTermination
}
}
使用local[1]
的应用程序的处理时间约为60秒。对于local[8]
,它下降到约15秒,这是我得到的最小值。
我总是通过套接字发送一两句话作为输入。
这是一种预期行为吗?如何优化应用程序以获得1秒的处理时间?
编辑:在这个问题上花了很长时间之后,我终于找到了解决办法。问题在于Spark默认使用的分区太多(几百个)。在添加spark.sql.shuffle.dartitions选项设置为8(我机器上的内核数量)后,数据处理的持续时间已降至300-400毫秒
val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.shuffle.partitions", 8)
.getOrCreate
我还不知道,这个数字是否应该是恒定的,如果Spark应用程序将在可能发生变化的基础设施(Spark、Kubernetes、AWS、自动缩放)上运行呢?
4个CPU内核和8个线程。
使用local[*]
和Spark将使用与核心数量一样多的处理线程,即4。如果这8个线程是虚拟核;CPU核心";因此8是用于处理的线程的最大数目。
这正是你的测试所证明的,即
对于
local[8]
,它下降到约15秒,这是我得到的最小值。这是一种预期行为吗?
是的,除非您更改处理逻辑,即结构化查询本身,否则很难赶时间。这就是我通常说的思考算法的地方(根据要处理的数据,可能会有所不同)。您受到可用CPU内核数量的限制。
如何优化应用程序以获得1秒的处理时间?
更改结构化查询("算法")或其隐藏的工作方式。
以下操作是处理逻辑:
.flatMap(_.split("\W+"))
.groupBy("value")
.count
CCD_ 5价格低廉,速度与CPU内核一样快。你对此无能为力。
您还使用流聚合groupBy
和count
来更改执行所需的任务数(在您的情况下,它将从8更改为默认的shuffle分区数,即200)。
你可以计算在8个内核上运行200个任务所需的CPU节拍数,你需要那么多时间来计算结果。
问题在于Spark默认使用的分区太多(几百个)。在添加spark.sql.shuffle.dartitions选项设置为8(我机器上的内核数量)后,数据处理的持续时间已降至300-400毫秒
当然,这在这种特殊情况下有所帮助,如果这是你可能拥有的唯一硬件,那也没关系。你完了。
在其他内核数量可能会更高的环境中呢?
如果这个数字应该是不变的,如果Spark应用程序将在可能发生变化的基础设施上运行(Spark、Kubernetes、AWS、自动缩放),该怎么办?
这是最难回答的问题。欢迎来到Apache Spark的动态/高度可配置的世界。影响最终结果的因素太多了,通常情况下,你所拥有的就是你最终应该得到的,或者你开始调整许多配置选项,你将不得不花几个小时或几个星期的时间来找出最佳配置。想想你的流媒体查询将处理的不同数据(数据形状、数量和速度)。这增加了混乱。
戴着咨询的帽子,在某个时候,你将不得不决定应用程序的性能是否足够好,或者你将花几个星期的时间希望你能做得比你已经取得的更好(而且必须有人为此买单)。
如果这个数字应该是恒定的或不是
如果你知道你将要处理的所有数据,那么你就可以做出这样一个艰难的假设。
一般情况下不应该这样,这就是Spark为您提供自适应查询执行(视频)的原因。