如何将PySpark中的表数据框导出为csv



我使用Spark 1.3.1 (PySpark),我使用SQL查询生成了一个表。我现在有一个物体是DataFrame。我想将这个DataFrame对象(我称之为"表")导出为csv文件,以便我可以操纵它并绘制列。如何将DataFrame"表"导出为csv文件?

谢谢!

如果数据帧适合驱动程序内存,并且您想保存到本地文件系统,您可以使用toPandas方法将Spark DataFrame转换为本地Pandas DataFrame,然后简单地使用to_csv:

df.toPandas().to_csv('mycsv.csv')

否则可以使用spark-csv:

  • 1.3火花
    df.save('mycsv.csv', 'com.databricks.spark.csv')
    
  • 1.4 +火花

    df.write.format('com.databricks.spark.csv').save('mycsv.csv')
    

在Spark 2.0+中,您可以直接使用csv数据源:

df.write.csv('mycsv.csv')

对于Apache Spark 2+,为了将数据帧保存到单个csv文件中。使用以下命令

query.repartition(1).write.csv("cc_out.csv", sep='|')

这里的1表示我只需要一个csv分区。

如果您不能使用spark-csv,您可以这样做:

df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")

如果你需要处理带有换行符或逗号的字符串,那将不起作用。使用:

import csv
import cStringIO
def row2csv(row):
    buffer = cStringIO.StringIO()
    writer = csv.writer(buffer)
    writer.writerow([str(s).encode("utf-8") for s in row])
    buffer.seek(0)
    return buffer.read().strip()
df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")

您需要在单个分区中重新划分Dataframe然后以Unix文件系统格式定义文件的格式,路径和其他参数好了,

df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')

阅读有关重分区功能的更多信息阅读更多关于保存功能的信息

但是,重分区是一个开销很大的函数,toPandas()是最糟糕的。尝试使用。coalesce(1)代替。repartition(1),以获得更好的性能。

使用PySpark

在Spark 3.0+中编写csv的最简单方法

sdf.write.csv("/path/to/csv/data.csv")

可以根据您正在使用的spark节点的数量生成多个文件。如果你想把它放在一个单独的文件中,可以使用重分区。

sdf.repartition(1).write.csv("/path/to/csv/data.csv")
使用熊猫

如果您的数据不是太多,并且可以在本地python中保存,那么您可以使用pandas too

sdf.toPandas().to_csv("/path/to/csv/data.csv", index=False)
使用考拉

sdf.to_koalas().to_csv("/path/to/csv/data.csv", index=False)

这样如何(如果您不想要一行)?

for row in df.collect():
    d = row.asDict()
    s = "%dt%st%sn" % (d["int_column"], d["string_column"], d["string_column"])
    f.write(s)

f是一个打开的文件描述符。分隔符也是一个TAB字符,但它很容易改变为任何你想要的

'''
I am late to the pary but: this will let me rename the file, move it to a desired directory and delete the unwanted additional directory spark made
'''
import shutil
import os
import glob
path = 'test_write'
#write single csv
students.repartition(1).write.csv(path)
#rename and relocate the csv
shutil.move(glob.glob(os.getcwd() + '\' + path + '\' + r'*.csv')[0], os.getcwd()+ '\' + path+ '.csv')
#remove additional directory
shutil.rmtree(os.getcwd()+'\'+path)

我在pandas中使用了这个方法,结果导致了糟糕的性能。最后花了很长时间,我停下来寻找另一种方法。

如果你正在寻找一种方法来写入一个csv而不是多个csv,这将是你要找的:

df.coalesce(1).write.csv("train_dataset_processed", header=True)

它将处理我的数据集从2+小时减少到2分钟

尝试显示(df)并在结果中使用下载选项。请注意:只有100万行可以下载这个选项,但它真的很快。

相关内容

  • 没有找到相关文章

最新更新