我在具有多个CPU的单个服务器上运行pyspark。除写入磁盘外,所有其他操作(读取,加入,过滤,自定义UDF)均快速执行。我要保存的数据框架的大小约为400 GB,带有200个分区。
sc.getConf().getAll()
驱动程序存储器为16G,工作目录具有足够的空间(> 10TB)
我正在尝试使用以下命令来保存:
df.repartition(1).write.csv("out.csv")
想知道是否有人遇到了同一问题。在调用Pyspark之前,还会更改任何配置参数帮助解决问题吗?
编辑(一些澄清):
当我指出其他操作很快执行时,转换后总是会有一个措施,就我而言,它们是行计数。因此,所有操作都超级执行。仍然没有解决为什么写作要花这么荒谬的时间。
我的一位同事提出了一个事实,即我们的服务器中的磁盘可能对并发写作有限制,这可能会减慢事情的速度,但仍在调查这一点。有兴趣知道其他人是否也看到在火花集群上的写作时间缓慢。我在AWS群集上对此有一个用户的确认。
所有其他操作(阅读,加入,过滤,自定义UDF)
有因为存在转换 - 他们没有做任何事情,直到必须保存数据为止。
我要保存的数据框架的大小约为400 GB (...) 我正在尝试使用以下命令来保存:
df.repartition(1).write.csv("out.csv")
无法正常工作。即使忽略了使用一台机器的部分,也可以使用一个线程(!)保存400GB。即使成功,它也不比使用普通bash脚本更好。
跳过火花 - 连续写作400GB的写入也需要大量时间,即使平均大小磁盘。并给定多次磁盘(join
,repartition
)数据将多次写入磁盘。
经过大量的反复试验,我意识到问题是由于我用来从磁盘读取文件的方法所致。我正在使用内置的读取.csv函数,当我切换到databricks-csv软件包中的读取功能时,问题就消失了。现在,我能够在合理的时间写文件到磁盘。这真的很奇怪,也许是2.1.1中的错误,或者Databricks CSV软件包确实非常优化。
1.Read.csv方法
from pyspark.sql import SparkSession
spark = SparkSession
.builder
.appName("model")
.config("spark.worker.dir", "xxxx")
.getOrCreate()
df = spark.read.load("file.csv", format="csv", header = True)
write.csv("file_after_processing.csv")
2.使用Databricks-CSV软件包
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('file.csv')
train.write.format('com.databricks.spark.csv').save('file_after_processing.csv')