我在文档中发现了几篇帖子、文章、参考资料等,这些帖子、文章、参考资料等暗示了您可以使用foreachPartition
访问特定分区的想法。但是,我还没有弄清楚如何处理给定分区中的所有数据。
我的目标是从数据库中选择一些数据,对其进行操作,按列中的唯一值进行分区,然后将每个分区作为单个特定命名的 jsonl 文件写入 s3,以供其他系统访问。
repartitioned = myDataframe.repartition("processed_date")
repartitioned.foreachPartition(writePartitionToS3)
我已经尝试了很多方法来解析这些数据,但似乎我只能在foreachPartition
中获取单个元组,并且分区本身没有界限,以便有效地分离这些数据。
def writePartitionsToS3(partition):
for row in partition:
pprint (row)
生成(为简洁起见,删除了几列):
行(entity_id=u'2315183', ...processed_date=datetime.date(2015, 3, 25)) 行(entity_id=u'2315183', ...processed_date=datetime.date(2015, 3, 25)) 行(entity_id=u'2315183', ... processed_date=datetime.date(2015, 3, 25)) row(entity_id=u'2315183', ...processed_date=datetime.date(2015, 3, 25))
也有可能分区没有按照我认为的方式定义,但我知道有一个内置的DataFrameWriter
可以按分区写入,但我不能使用它。我真的需要能够生成这样的命名文件,而不是 part-xxx 格式:
s3a://<bucket>/<prefix>/<date processed>.jsonl
我正在以这样一种方式分块数据,即分区的大小相对较小(每个processed_date一个,每个实体被选为自己的 DataFrame),所以这不是问题,但我也不是真的想在一个节点上collect()
所有内容来解析分区列表,因为我想并行将文件写入 s3。
更新:
我最终通过获取唯一值然后基于这些值过滤原始数据集来实际解决我的情况的问题。请记住,如果数据集非常大,您将永远不想这样做,但我可以选择,因为我在循环中创建小型数据帧(从数据库中选择),然后处理这些块。
# Get a list of the unique values present
# in the processed_date column
uniqueProcessedDates = myDataframe.select('processed_date')
.distinct().rdd.map(lambda r: r[0]).collect()
# For each unique processed date we want to
# filter records and then write them
for value in uniqueProcessedDates:
sortedRowsThisProcessedDate = myDataframe.filter(postgresDF.processed_date == date)
# some custom function to write the data
writeProcessedDatesToS3(sortedRowsThisProcessedDate.collect())
综上所述,我相信有很多方法可以做到这一点,效率非常低。我正在考虑的一件事是根据需要写入每个文件的确切值集对每个RDD进行重新分区,因为对s3的写入必须以原子方式完成。我认为除此之外,这可能有助于避免在写入数据之前从多个节点收集。
没有限制可访问。DataFrame.repartition
使用哈希分区程序来打乱数据,因此行的重现没有更广泛的含义。您在这里可以假设的是特定processed_date
的所有记录都位于特定分区上。
您可以通过添加sortWithinPartitions
来改善情况:
(myDataframe
.repartition("processed_date")
.sortWithinPartitions("processed_date"))
以便能够逐个访问单个日期的所有记录。
另一个可能的改进是使用orderBy
方法:
myDataframe.orderBy("processed_date")
这将导致连续的日期,但仍然无法访问边界。
在这两种情况下,您都必须在迭代分区时手动检测边界。
最后,如果你想要真正的控制,请使用RDD
和repartitionAndSortWithinPartitions
方法。这将为您提供对数据分布的精细控制。您可以定义partitionFunc
以特定方式分发数据,因此预先没有分区边界。
DataFrameWriter.partitionBy
使用不同的机制,在这里对您没有用。