使用 PySpark 将每行的每一列作为单独的文件写入 S3



我有一个用例,程序需要将数据帧中的每一列作为单独的文件写入 S3 或 EMR 上的 HDFS。我正在对原始数据进行一些处理,输出数据框如下所示;

+------+--------------------+--------------------+--------------------+--------------------+
|    id|         processed_1|         processed_2|         processed_3|               error|
+------+--------------------+--------------------+--------------------+--------------------+
|324650|some processed data |some processed data | some processed data|                null|
+------+--------------------+--------------------+--------------------+--------------------+

对于 3 列processed_1processed_2processed_3、 我想将每行的每一列存储在一个单独的文件中。我有 100k 行已处理的数据。我尝试使用 UDF 和 Python 来做到这一点;

def writeToDisk(doc_id,error, processed_1, processed_2, processed_3):

try:
if error is None:
with open(r'hdfs://processed_1.json'.format(doc_id),'w',encoding='utf-8') as f:
f.write(processed_1)
with open(r'hdfs://processed_2.json'.format(doc_id),'w') as f:
f.write(processed_2)

with open(r'hdfs://processed_3.json'.format(doc_id),'w') as f:
f.write(processed_3)
return "SUCCESS"

else:
error_prefix='{} - root - ERROR - '.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
final_error_msg='{}{}'.format(error_prefix,error)
with open(r'hdfs://error.log'.format(doc_id),'w') as f:
f.write(unprocessed_html)

return "SUCCESS"

except Exception as e:
with open(r'hdfs://error.log','w') as f:
f.write("Failed : {}".format(str(e)))
return "FAILED"

并将上述函数注册为 udf 并在 as 中使用;

store_data_udf = udf(writeToDisk, StringType())
stored_data = final_data.withColumn("store_results",store_data_udf("id","error","processed_1","processed_2","processed_3"))

上述方法不起作用。我不确定我在这里错过了什么。

关于如何完成这项任务的任何想法将不胜感激。

你不能使用 python 写文件函数写入 HDFS。相反,您可以创建 3 个包含所需列的单独数据帧,并将其写入 hdfs/s3。

from pyspark.sql import SparkSession
from pyspark.sql.functions import  monotonically_increasing_id
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
file_1 = {"id": 1, "error": 20, 'processed_1': "test", 'processed_2': "test2", 'processed_3': "test3"}
file_2 = {"id": 2, "error": 30, 'processed_1': "test5", 'processed_2': "test6", 'processed_3': "test7"}
final_data = spark.read.json(sc.parallelize([file_1,file_2]))
df1=final_data.select("id","error","processed_1").withColumn("num", monotonically_increasing_id())
df2=final_data.select("id","error","processed_2").withColumn("num", monotonically_increasing_id())
df3=final_data.select("id","error","processed_3").withColumn("num", monotonically_increasing_id())
df1.coalesce(1).write.partitionBy("num").parquet("df1/")
df2.coalesce(1).write.partitionBy("num").parquet("df2/")
df3.coalesce(1).write.partitionBy("num").parquet("df3/")

最新更新