如何使用pyspark将DataFrame的每一行保存为HDFS文件



我有一个以ctrl a分隔的文件,文件头如下:

filename','file_metadata','data_content','status','errortype','error_message'

我需要将文件的每个记录的各个文件转储到hdfs,比如-basepath_errortype_filename/file.json,文件的内容将是data_content列。

显示样本数据:

>>> ff_df = ff_rdd.toDF(['file_name','file_metadata','data_content','status','error_type','error_message'])
>>> ff_df .show()
+--------------+-------------+--------------------+------+-------------+--------------------+
|     file_name|file_metadata|        data_content|status|   error_type|       error_message|
+--------------+-------------+--------------------+------+-------------+--------------------+
|test_file.json|     metadata|{ "fruit": "Apple...|FAILED| INVALID_JSON|     could not parse|
|demo_file.json|     metadata|{ "fruit": "Apple...|FAILED|MISSING_RULES|No matching rules...|
+--------------+-------------+--------------------+------+-------------+--------------------+

现在,我需要这两行作为hdfs中的两个文件,分别位于/tmp/INVALID_JSON_test_file和/tmp/MMISING_RULES_demo_file文件夹中。我已经编写了以下pyspark代码,但没有得到所需的结果。请帮助

def write_file(line)
tokens=line.split("x01")
file_name=tokens[0]
error_type=tokens[4]
content=tokens[2]
#define path to saved file
file_name = %s + "/" + 
directory_name = basePath"/"error_type"/"file_name
return directory_name
# get the file content
ff_rdd = sc.textFile("/tmp/pyspark1.txt").map(lambda line: line.split("x01"))
ff_df = ff_rdd.toDF(['file_name','file_metadata','data_content','status','error_type','error_message'])
content_df = ff_df.select("data_content")
file_path = sc.textFile("/tmp/pyspark1.txt").map(lambda line: write_file(line))
content_df.rdd.saveAsTextFile("file_path")```

样本输入:

+--------------+-------------+-------------------+------+-------------+-----------------+
|     file_name|file_metadata|       data_content|status|   error_type|    error_message|
+--------------+-------------+-------------------+------+-------------+-----------------+
|test_file.json|     metadata|{ "fruit": "Apple"}|FAILED| INVALID_JSON|  could not parse|
|demo_file.json|     metadata|   { "fruit": "Ab"}|FAILED|MISSING_RULES|No matching rules|
+--------------+-------------+-------------------+------+-------------+-----------------+

首先,我们将连接error_type列和file_name列(仅文件名,不包括扩展名(以创建newColumn

final_df = df_new.withColumn("newColumn",concat(col("error_type"),lit("_"),split("file_name",".")[0]))

在运行df_new.show(truncate=false)时,我们将看到示例输出为:

+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
|file_name     |file_metadata|data_content       |status|error_type   |error_message    |newColumn              |
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
|test_file.json|metadata     |{ "fruit": "Apple"}|FAILED|INVALID_JSON |could not parse  |INVALID_JSON_test_file |
|demo_file.json|metadata     |{ "fruit": "Ab"}   |FAILED|MISSING_RULES|No matching rules|MISSING_RULES_demo_file|
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+

为了实现所需格式的目录结构,例如:Base_directory/INVALID_JSON_test_file,在编写时,我们必须在创建的newColumn的基础上对final_df进行分区。

我们可以使用以下内容编写:

final_df.select("data_content","newColumn").write.partitionBy("newColumn").save(FilePath)

默认情况下,将写入镶木地板文件。我认为不可能将输出写入文本文件,因为它不接受多列,我们将要求newColumndata_content一起写入,因为我们在newColumn的基础上对数据帧进行分区。

最新更新