将数据帧写成磁盘,在Pyspark中花费了不切实际的时间(SPARK 2.1.1)



我在具有多个CPU的单个服务器上运行pyspark。除写入磁盘外,所有其他操作(读取,加入,过滤,自定义UDF)均快速执行。我要保存的数据框架的大小约为400 GB,带有200个分区。

 sc.getConf().getAll()

驱动程序存储器为16G,工作目录具有足够的空间(> 10TB)

我正在尝试使用以下命令来保存:

 df.repartition(1).write.csv("out.csv")

想知道是否有人遇到了同一问题。在调用Pyspark之前,还会更改任何配置参数帮助解决问题吗?

编辑(一些澄清):

当我指出其他操作很快执行时,转换后总是会有一个措施,就我而言,它们是行计数。因此,所有操作都超级执行。仍然没有解决为什么写作要花这么荒谬的时间。

我的一位同事提出了一个事实,即我们的服务器中的磁盘可能对并发写作有限制,这可能会减慢事情的速度,但仍在调查这一点。有兴趣知道其他人是否也看到在火花集群上的写作时间缓慢。我在AWS群集上对此有一个用户的确认。

所有其他操作(阅读,加入,过滤,自定义UDF)

有因为存在转换 - 他们没有做任何事情,直到必须保存数据为止。

我要保存的数据框架的大小约为400 GB (...) 我正在尝试使用以下命令来保存:

df.repartition(1).write.csv("out.csv")

无法正常工作。即使忽略了使用一台机器的部分,也可以使用一个线程(!)保存400GB。即使成功,它也不比使用普通bash脚本更好。

跳过火花 - 连续写作400GB的写入也需要大量时间,即使平均大小磁盘。并给定多次磁盘(joinrepartition)数据将多次写入磁盘。

经过大量的反复试验,我意识到问题是由于我用来从磁盘读取文件的方法所致。我正在使用内置的读取.csv函数,当我切换到databricks-csv软件包中的读取功能时,问题就消失了。现在,我能够在合理的时间写文件到磁盘。这真的很奇怪,也许是2.1.1中的错误,或者Databricks CSV软件包确实非常优化。

1.Read.csv方法

from pyspark.sql import SparkSession
spark = SparkSession 
    .builder 
    .appName("model") 
    .config("spark.worker.dir", "xxxx") 
    .getOrCreate()
df = spark.read.load("file.csv", format="csv", header = True)
write.csv("file_after_processing.csv")

2.使用Databricks-CSV软件包

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('file.csv')
train.write.format('com.databricks.spark.csv').save('file_after_processing.csv')

相关内容

  • 没有找到相关文章

最新更新