我正在使用pyspark流到S3的ETL输入文件。
我需要能够建立所有原始输入文件的审计跟踪在s3://上,我的镶木地板输出最终在hdfs://上。
给定一个数据流、rdd,甚至一个特定的rdd分区,这可能吗确定s3中输入数据的原始文件名?
目前,我知道的唯一方法是rdd.toDebugString()
,并尝试解析它。然而,这感觉真的很糟糕,而且没有在某些情况下工作。例如,解析调试输出对于我我也在做(使用sc.TextFile("s3://...foo/*")
样式的globs)。
有人有一个合理的方法来确定原始文件名吗?
似乎其他一些spark用户在过去也有过这个问题,因为示例:
http://apache-spark-user-list.1001560.n3.nabble.com/Access-original-filename-in-a-map-function-tt2831.html
谢谢!
我们遇到了同样的问题,而且文件足够小,所以我们使用了sc.wholeTextFiles("s3:...foo/*")
。
它创建了CCD_ 4的RDD,并且我们将文件名附加到文件的内容中以供使用。
如何将RDD[(String,String)]转换为RDD[Array[String]]?