PYSPARK:写信给CSV写下Parquet而不是CSV



在下面的代码中,out.csv为parquet格式。我缺少什么选项将其写入CSV文件?

import py4j
from pyspark import SparkConf, SparkContext
from pyspark import HiveContext as hc
import os
from pyspark.sql import SQLContext, Row
from datetime import datetime
from pyspark.sql.types import DateType,StringType
import pyspark.sql.functions as F
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.11:1.5.0'
conf = SparkConf().setMaster("local[64]").setAppName("My App")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
#read parquet file into DF
df = sqlContext.read.parquet('/path/in_parquet')
# Write to csv
df_grouped = df.groupBy('column1').agg(F.sum('column2'))
df_grouped.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("/path/out.csv")

输出:

控制台中的最后几行。另外,这是我用来运行脚本的命令:

spark-submit-马斯特本地[*] - 驱动器 - 驱动器12G-包装com.databricks:spark-csv_2.11:1.2.0 mypyspark.py

$ hdfs dfs -ls /path/out.csv
Found 2 items
-rw-r--r--   3 me devs          0 2017-06-29 12:16 /path/out.csv/_SUCCESS
-rw-r--r--   3 me devs        552 2017-06-29 12:16 /path/out.csv/part-00000

Spark正在分别保存数据的每个分区,因此,您将获得每个分区的文件part-xxxxx。您指定.save("/path/out.csv")的路径是要将文件保存到的目录,其中part-xxxxx文件中的csv格式中已经有。

如果您有多个文件和一个小数据集,则可以使用coalesce(1),然后保存结果以恢复单个csv文件。对于较大的数据集,我建议先保存,然后将文件与FileUtil.copyMerge()(Hadoop命令(合并。

最新更新