我正在 PySpark 2.4 中创建JDBC
查询的临时视图。我的数据源是MS SQL Server 2017
。
df = spark.read.format("jdbc").options(url="url",properties = { "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" },dbtable="dbtable").load()
df.createOrReplaceTempView("df_temp")
现在我可以将临时创建的表查询为
df_new = spark.sql("select * from df_temp where ...#standard query")
现在我想将上述df_new
写为CSV
我的本地驱动器。一种方法(目前我正在做同样的事情)是转换该df_new.toPandas()
。然后将其另存为csv
(标准df.to_csv('/path/file.csv'
方法)。此方法创建一个名为file.csv
的文件夹,并在此文件夹中生成名称以part-00000-fd4c62bd-f208-4bd3-ae99-f81338b9ede1-c000.csv
开头的 csv 文件。
因此,如果我每天运行.py
文件(使用像crontab
这样的调度程序),这绝对不是一个好的选择。
问题:
如何标准化.csv文件的名称,并每天将新数据附加到同一文件中?
有没有简单的方法可以将
df_new
转换为表或Spark DataFrame
.这样我就可以申请df.coalesce(1).option("header","true").csv('/path/myfile.csv')
?
使用它保存到本地文件系统
#for Output in multiple files:
df.write.option("header", "true").csv("/path/output.csv")
#for output in single file:
df.coalesce(1).write.option("header", "true").csv("/path/output.csv")