我正在spark-sql中执行一个查询,如下所示。表的数据存储在配置单元表中的2个不同节点中。
但由于查询有点慢,我试图在spark中找到一些选项,以便查询可以更快地执行。所以我发现我们可以将sparksql.sql.codegen
和spark.sql.inMemoryColumnarStorage.compressed
配置为true,而不是默认的false。
但我没有任何改进,这两个选项为true的查询需要4.1分钟才能执行。如果此选项为false,也需要4.1分钟。
你明白为什么这些选项不起作用吗?
query = hiveContext.sql("""select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-09-16'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus""");
query.collect();
-
默认情况下,对于spark 2.0启用
spark.sql.codegen.wholeStage
。并且它将从火花催化器方面进行所有可能的内部优化 -
默认情况下,
spark.sql.codegen
(Spark 1.3+中的功能)是false
。即使您将其设为true,也可以与DF.explain / debug
进行交叉检查
但是,请。请重新访问下面在spark 2+中解释的方法。
如果您使用的是较低版本的spark,即1.3或1.4+,则相同的DataFrame方法是有效的,除非我们必须与hiveContext一起使用。
- 根据我的经验,上述查询的Dataset[Row](又名DataFrame)方法比普通的hive查询快一点
请尝试下面的伪代码。
像这样创建一个没有任何聚合、分组、排序的数据帧。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import spark.implicits._
import spark.sql
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder()
.appName("Spark Hive Aggregations")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
val df : DataFrame = sql(("""select l_returnflag, l_linestatus,l_quantity,l_extendedprice,l_quantity ,l_extendedprice,l_quantity, l_extendedprice, l_discount from
lineitem where l_shipdate <= '1998-09-16""");
// can use spark udf or when(cond, evaluation), instead of direct expression
val df1 = df.withColumn("sum_disc_price", df.col("l_extendedprice") * (1 - df.col("l_discount"))
.withColumn("sum_charge", df.col("l_extendedprice") * (1 + df.col("l_tax"))
//NOW SUM, AVG and group by on dataframe
val groupeddf = df1.groupBy(
df1.col("returnflag")
, df1.col("l_linestatus")
.agg(
avg(df1.col("l_quantity")),
, avg(df1.col("l_extendedprice"))
, avg(df1.col("l_discount"))
, sum(df1.col("l_quantity"))
, sum(df1.col("l_extendedprice"))
, sum(df1.col("sum_disc_price"))
, sum(df1.col("sum_charge"))
, count(df1.col("l_linestatus").as("cnt")
) //end agg
) //end group by
//order by on dataframe
.orderBy("l_returnflag"))
.sort("l_linestatus")
val finalDF = groupeddf.select("l_returnflag","l_linestatus",............. etc);
- 此外,还需要考虑执行器内存、执行器/内核数量等参数,以找到确切的问题