我有一个以ctrl a分隔的文件,文件头如下:
filename','file_metadata','data_content','status','errortype','error_message'
我需要将文件的每个记录的各个文件转储到hdfs,比如-basepath_errortype_filename/file.json,文件的内容将是data_content列。
显示样本数据:
>>> ff_df = ff_rdd.toDF(['file_name','file_metadata','data_content','status','error_type','error_message'])
>>> ff_df .show()
+--------------+-------------+--------------------+------+-------------+--------------------+
| file_name|file_metadata| data_content|status| error_type| error_message|
+--------------+-------------+--------------------+------+-------------+--------------------+
|test_file.json| metadata|{ "fruit": "Apple...|FAILED| INVALID_JSON| could not parse|
|demo_file.json| metadata|{ "fruit": "Apple...|FAILED|MISSING_RULES|No matching rules...|
+--------------+-------------+--------------------+------+-------------+--------------------+
现在,我需要这两行作为hdfs中的两个文件,分别位于/tmp/INVALID_JSON_test_file和/tmp/MMISING_RULES_demo_file文件夹中。我已经编写了以下pyspark代码,但没有得到所需的结果。请帮助
def write_file(line)
tokens=line.split("x01")
file_name=tokens[0]
error_type=tokens[4]
content=tokens[2]
#define path to saved file
file_name = %s + "/" +
directory_name = basePath"/"error_type"/"file_name
return directory_name
# get the file content
ff_rdd = sc.textFile("/tmp/pyspark1.txt").map(lambda line: line.split("x01"))
ff_df = ff_rdd.toDF(['file_name','file_metadata','data_content','status','error_type','error_message'])
content_df = ff_df.select("data_content")
file_path = sc.textFile("/tmp/pyspark1.txt").map(lambda line: write_file(line))
content_df.rdd.saveAsTextFile("file_path")```
样本输入:
+--------------+-------------+-------------------+------+-------------+-----------------+
| file_name|file_metadata| data_content|status| error_type| error_message|
+--------------+-------------+-------------------+------+-------------+-----------------+
|test_file.json| metadata|{ "fruit": "Apple"}|FAILED| INVALID_JSON| could not parse|
|demo_file.json| metadata| { "fruit": "Ab"}|FAILED|MISSING_RULES|No matching rules|
+--------------+-------------+-------------------+------+-------------+-----------------+
首先,我们将连接error_type
列和file_name
列(仅文件名,不包括扩展名(以创建newColumn
。
final_df = df_new.withColumn("newColumn",concat(col("error_type"),lit("_"),split("file_name",".")[0]))
在运行df_new.show(truncate=false)
时,我们将看到示例输出为:
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
|file_name |file_metadata|data_content |status|error_type |error_message |newColumn |
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
|test_file.json|metadata |{ "fruit": "Apple"}|FAILED|INVALID_JSON |could not parse |INVALID_JSON_test_file |
|demo_file.json|metadata |{ "fruit": "Ab"} |FAILED|MISSING_RULES|No matching rules|MISSING_RULES_demo_file|
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
为了实现所需格式的目录结构,例如:Base_directory/INVALID_JSON_test_file,在编写时,我们必须在创建的newColumn
的基础上对final_df进行分区。
我们可以使用以下内容编写:
final_df.select("data_content","newColumn").write.partitionBy("newColumn").save(FilePath)
默认情况下,将写入镶木地板文件。我认为不可能将输出写入文本文件,因为它不接受多列,我们将要求newColumn
与data_content
一起写入,因为我们在newColumn
的基础上对数据帧进行分区。