如何使用套接字数据源优化流聚合



我在4个CPU内核和8个线程上使用Spark 2.4.0和Scala 2.11。

我写了以下应用程序:

package demos.spark
object WordCounter {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.master("local[4]")
.getOrCreate
import spark.implicits._
spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load
.as[String]
.flatMap(_.split("\W+"))
.groupBy("value")
.count
.writeStream
.outputMode("complete")
.format("console")
.start
.awaitTermination
}
}

使用local[1]的应用程序的处理时间约为60秒。对于local[8],它下降到约15秒,这是我得到的最小值。

我总是通过套接字发送一两句话作为输入。

这是一种预期行为吗?如何优化应用程序以获得1秒的处理时间?

编辑:在这个问题上花了很长时间之后,我终于找到了解决办法。问题在于Spark默认使用的分区太多(几百个)。在添加spark.sql.shuffle.dartitions选项设置为8(我机器上的内核数量)后,数据处理的持续时间已降至300-400毫秒

val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.shuffle.partitions", 8)
.getOrCreate

我还不知道,这个数字是否应该是恒定的,如果Spark应用程序将在可能发生变化的基础设施(Spark、Kubernetes、AWS、自动缩放)上运行呢?

4个CPU内核和8个线程。

使用local[*]和Spark将使用与核心数量一样多的处理线程,即4。如果这8个线程是虚拟核;CPU核心";因此8是用于处理的线程的最大数目。

这正是你的测试所证明的,即

对于local[8],它下降到约15秒,这是我得到的最小值。

这是一种预期行为吗?

是的,除非您更改处理逻辑,即结构化查询本身,否则很难赶时间。这就是我通常说的思考算法的地方(根据要处理的数据,可能会有所不同)。您受到可用CPU内核数量的限制。

如何优化应用程序以获得1秒的处理时间?

更改结构化查询("算法")或其隐藏的工作方式。

以下操作是处理逻辑:

.flatMap(_.split("\W+"))
.groupBy("value")
.count

CCD_ 5价格低廉,速度与CPU内核一样快。你对此无能为力。

您还使用流聚合groupBycount来更改执行所需的任务数(在您的情况下,它将从8更改为默认的shuffle分区数,即200)。

你可以计算在8个内核上运行200个任务所需的CPU节拍数,你需要那么多时间来计算结果。

问题在于Spark默认使用的分区太多(几百个)。在添加spark.sql.shuffle.dartitions选项设置为8(我机器上的内核数量)后,数据处理的持续时间已降至300-400毫秒

当然,这在这种特殊情况下有所帮助,如果这是你可能拥有的唯一硬件,那也没关系。你完了。

在其他内核数量可能会更高的环境中呢?

如果这个数字应该是不变的,如果Spark应用程序将在可能发生变化的基础设施上运行(Spark、Kubernetes、AWS、自动缩放),该怎么办?

这是最难回答的问题。欢迎来到Apache Spark的动态/高度可配置的世界。影响最终结果的因素太多了,通常情况下,你所拥有的就是你最终应该得到的,或者你开始调整许多配置选项,你将不得不花几个小时或几个星期的时间来找出最佳配置。想想你的流媒体查询将处理的不同数据(数据形状、数量和速度)。这增加了混乱。

戴着咨询的帽子,在某个时候,你将不得不决定应用程序的性能是否足够好,或者你将花几个星期的时间希望你能做得比你已经取得的更好(而且必须有人为此买单)。

如果这个数字应该是恒定的或不是

如果你知道你将要处理的所有数据,那么你就可以做出这样一个艰难的假设。

一般情况下不应该这样,这就是Spark为您提供自适应查询执行(视频)的原因。

相关内容

  • 没有找到相关文章