我们正在编写一个应用程序,允许用户对几个不同的数据库后端(包括Apache Spark)运行SQL查询。我们有一个测试套件,可以针对这些后端运行许多小查询。这些查询在Spark上比在我们支持的其他后端上要慢得多(在某些情况下要慢50倍),这给我们的测试套件带来了问题。
我认为这是因为Spark被设计成一个分布式查询引擎,当查询本身非常小时,它所涉及的开销占主导地位。
作为一个例子,下面是一个脚本,它创建一个表,并对它重复运行一个简单的查询:
CREATE TABLE test (v INT);
INSERT INTO test VALUES (1), (2), (3), (4), (5);
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
SELECT MAX(v) FROM test;
我可以使用Docker在Spark上运行这个脚本:
docker run --rm -i apache/spark:3.3.1 /opt/spark/bin/spark-sql < spark-test.sql
尽管数据量很小,但这些查询从来没有在大约250毫秒内完成。
是否有任何方法配置Spark以减少此开销?我不关心数据完整性或持久性。我们只需要能够检查查询是否按预期执行。
Spark config
其中一些可能是难以处理的开销,但以下设置可以帮助大大加快小火花测试。
"spark.sql.shuffle.partitions" = "1"
默认的shuffle分区是200,这会在小型测试中创建许多小文件,如果在小型数据集上进行大量连接,则影响会更大。
"spark.sql.autoBroadcastJoinThreshold" = "-1"
因为你的数据很小,所有的连接都可以广播(发送到所有的执行器,而不是洗牌)。
配置选项的详细信息请参见spark文档。您可以通过cli使用--conf
设置这些,或者在创建SparkSession对象时使用.config()
。
其他建议上述设置可能对您给出的简单示例没有帮助。我们来深入研究一下
当你运行这些命令时:
CREATE TABLE test (v INT);
INSERT INTO test VALUES (1), (2), (3), (4), (5);
Spark在/opt/spark/work-dir/spark-warehouse/
中创建一个文件夹test
,并创建5个文件,每次插入1个。由于上面给出的原因,小文件是不可取的,所以我们可以使用提示编写单个文件。
INSERT INTO test SELECT /*+ COALESCE(1) */ * FROM VALUES (1), (2), (3), (4), (5);
我们可以做的另一件事是将基于tmpfs
的内存挂载到spark写入的位置,这应该比磁盘快。
docker run --rm --tmpfs /opt/spark/work-dir/ --tmpfs /tmp/ -i apache/spark:3.3.1 /opt/spark/bin/spark-sql < spark-test.sql
我们也可以尝试使用csv代替parquet,这是针对大型列数据CREATE TABLE test (v INT) USING CSV;
进行优化的。我们还可以在插入之后使用CACHE TABLE test;
,同样将表保存在内存中。
在简单的例子中,我看到的改进是适度的~10%,但它可能对您的套件整体有更大的帮助。