我用的是Spark 2.0。
我有一个数据框架。我的代码看起来像这样:
df.write.partitionBy("year", "month", "day").format("csv").option("header", "true").save(s"s3://bucket/")
程序执行时,以以下格式写入文件:
s3://bucket/year=2016/month=11/day=15/file.csv
我如何将格式配置成这样:
s3://bucket/2016/11/15/file.csv
我还想知道是否可以配置文件名。
这里是相关的文档,看起来相当稀疏…
http://spark.apache.org/docs/latest/api/scala/index.html org.apache.spark.sql.DataFrameWriter
partitionBy(colNames: String*): DataFrameWriter[T]
Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:
year=2016/month=01/
year=2016/month=02/
Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.
This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
这是期望和期望的行为。Spark使用目录结构进行分区发现和修剪,正确的结构(包括列名)是它工作所必需的。
您还必须记住,分区会删除用于分区的列。
如果你需要特定的目录结构,你应该使用下游进程重命名目录
您可以使用下面的脚本来布局目录的名称:
#!/usr/bin/env bash
# Rename repartition folder: delete COLUMN=, e.g. DATE=20170708 to 20170708.
path=$1
col=$2
for f in `hdfs dfs -ls $ | awk '{print $NF}' | grep $col=`; do
a="$(echo $f | sed s/$col=//)"
hdfs dfs -mv "$f" "$a"
done