spark1.6(或所有低于2的Spark版本(中的
我试图通过id将dataFrame保存到csv分区中,为此我使用了spark 1.6和scala。函数partitionBy("id"(没有给我正确的结果。
我的代码在这里:
validDf.write
.partitionBy("id")
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", ";")
.mode("overwrite")
.save("path_hdfs_csv")
My Dataframe looks like :
-----------------------------------------
| ID | NAME | STATUS |
-----------------------------------------
| 1 | N1 | S1 |
| 2 | N2 | S2 |
| 3 | N3 | S1 |
| 4 | N4 | S3 |
| 5 | N5 | S2 |
-----------------------------------------
此代码创建3个csv默认分区(part_0、part_1、part_2(,不基于列ID。
我所期望的是:为每个id获取子目录或分区。有什么帮助吗?
Spark csv不支持分区
您的代码适用于spark>2.0.0。
对于您的spark版本,您需要首先准备csv并将其保存为文本(分区适用于spark-text
(:
import org.apache.spark.sql.functions.{col,concat_ws}
val key = col("ID")
val concat_col = concat_ws(",",df.columns.map(c=>col(c)):_*) // concat cols to one col
val final_df = df.select(col("ID"),concat_col) // dataframe with 2 columns: id and string
final_df.write.partitionBy("ID").text("path_hdfs_csv") //save to hdfs