是否可以对Apache Spark快速运行小型测试查询?



我们正在编写一个应用程序,允许用户对几个不同的数据库后端(包括Apache Spark)运行SQL查询。我们有一个测试套件,可以针对这些后端运行许多小查询。这些查询在Spark上比在我们支持的其他后端上要慢得多(在某些情况下要慢50倍),这给我们的测试套件带来了问题。

我认为这是因为Spark被设计成一个分布式查询引擎,当查询本身非常小时,它所涉及的开销占主导地位。

作为一个例子,下面是一个脚本,它创建一个表,并对它重复运行一个简单的查询:

CREATE TABLE test (v INT);
INSERT INTO test VALUES (1), (2), (3), (4), (5);
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;

我可以使用Docker在Spark上运行这个脚本:

docker run --rm -i apache/spark:3.3.1 /opt/spark/bin/spark-sql < spark-test.sql

尽管数据量很小,但这些查询从来没有在大约250毫秒内完成。

是否有任何方法配置Spark以减少此开销?我不关心数据完整性或持久性。我们只需要能够检查查询是否按预期执行。

Spark config

其中一些可能是难以处理的开销,但以下设置可以帮助大大加快小火花测试。

"spark.sql.shuffle.partitions" = "1"

默认的shuffle分区是200,这会在小型测试中创建许多小文件,如果在小型数据集上进行大量连接,则影响会更大。

"spark.sql.autoBroadcastJoinThreshold" = "-1"

因为你的数据很小,所有的连接都可以广播(发送到所有的执行器,而不是洗牌)。

配置选项的详细信息请参见spark文档。您可以通过cli使用--conf设置这些,或者在创建SparkSession对象时使用.config()

其他建议上述设置可能对您给出的简单示例没有帮助。我们来深入研究一下

当你运行这些命令时:

CREATE TABLE test (v INT);
INSERT INTO test VALUES (1), (2), (3), (4), (5);

Spark在/opt/spark/work-dir/spark-warehouse/中创建一个文件夹test,并创建5个文件,每次插入1个。由于上面给出的原因,小文件是不可取的,所以我们可以使用提示编写单个文件。

INSERT INTO test SELECT /*+ COALESCE(1) */ * FROM VALUES (1), (2), (3), (4), (5);

我们可以做的另一件事是将基于tmpfs的内存挂载到spark写入的位置,这应该比磁盘快。

docker run --rm --tmpfs /opt/spark/work-dir/ --tmpfs /tmp/ -i apache/spark:3.3.1 /opt/spark/bin/spark-sql < spark-test.sql

我们也可以尝试使用csv代替parquet,这是针对大型列数据CREATE TABLE test (v INT) USING CSV;进行优化的。我们还可以在插入之后使用CACHE TABLE test;,同样将表保存在内存中。

在简单的例子中,我看到的改进是适度的~10%,但它可能对您的套件整体有更大的帮助。

最新更新