我想将一个大型排序表持久化到S3上的Parquet,然后将其读入并使用排序合并连接策略将其连接到另一个大型排序表。
问题是:即使我事先在连接键上对这些表进行排序,一旦我将它们持久化到Parquet,它们似乎丢失了关于它们排序的信息。是否有任何方式暗示Spark,他们不需要诉诸下次我读他们吗?
我一直在Spark 1.5上尝试这个,我一直得到SQL EXPLAIN
计划看起来像:
[== Physical Plan ==]
[TungstenProject [pos#28400,workf...#28399]]
[ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]]
[ TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28403,pos#28400)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:/....sorted.parquet][pos#284....8424]]
[ TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28399,pos#28332)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:....exploded_sorted.parquet][pos#2.....399]]
你可以在那里看到额外的TungstenExchange和TungstenSort阶段,即使这个连接是在两个表上,在保存到Parquet之前,在连接键上进行了orderBy
排序。
看起来这将在Spark 2.0中随着对bucket的支持而到来。
不幸的是,Spark-2.0还不支持通过bucket写入S3。我昨天试用了spark -2.0预览版。
val NUMBER_OF_BUCKETS = 20
rdd.toDF.write.mode(SaveMode.Overwrite)
.bucketBy(NUMBER_OF_BUCKETS,"data_frame_key")
.partitionBy("day")
.save("s3://XXXXX")
并得到这个错误信息:
java.lang.IllegalArgumentException: Currently we don't support writing bucketed data to this data source.
at org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:462)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)