这篇文章非常出色地显示了如何用pyspark(Pyspark parse parse text File)将固定宽度文本文件解析到Spark Dataframe中。
我有几个我想解析的文本文件,但是它们的模式略有不同。我不必像上一篇文章所建议的那样为每个过程写出相同的过程,而是想编写一个通用函数,该功能可以分析给定宽度和列名给定固定宽度文本文件。
我是Pyspark的新手
任何帮助将不胜感激!
说我们在示例线程中有一个文本文件:
00101292017you1234
00201302017 me5678
在"/tmp/sample.txt"
中。和一个包含每个文件名,列列表和宽度列表的字典:
schema_dict = {
"sample": {
"columns": ["id", "date", "string", "integer"],
"width" : [3, 8, 3, 4]
}
}
我们可以加载数据框并将它们分成列迭代,使用:
import numpy as np
input_path = "/tmp/"
df_dict = dict()
for file in schema_dict.keys():
df = spark.read.text(input_path + file + ".txt")
start_list = np.cumsum([1] + schema_dict[file]["width"]).tolist()[:-1]
df_dict[file] = df.select(
[
df.value.substr(
start_list[i],
schema_dict[file]["width"][i]
).alias(schema_dict[file]["columns"][i]) for i in range(len(start_list))
]
)
+---+--------+------+-------+
| id| date|string|integer|
+---+--------+------+-------+
|001|01292017| you| 1234|
|002|01302017| me| 5678|
+---+--------+------+-------+