读入文件并将其拆分为两个数据帧(Pyspark,spark数据帧)



我对Pyspark(和Spark(还很陌生,有一项具体的任务要解决,目前我还不知道:(。

我有一堆如下结构的文件:

"文件_A.dtx":

## Animal
# Header Start
Name, Type, isMammal
# Body Start
Hasi, Rabbit, yes
Birdi, Bird, no
Cathi, Cat, yes
## House
# Header Start
Street, Number
# Body Start
Main Street, 32
Buchengasse, 11

"文件_B.dtx":

## Animal
# Header Start
Name, Type, isMammal
# Body Start
Diddi, Dog, yes
Eli, Elephant, yes
## House
# Header Start
Street, Number
# Body Start
Strauchweg, 13
Igelallee, 22

我的预期结果是如下两个数据帧:

动物:

| Filename   | Name    | Type     | isMammal    | 
| ---------- | ------- | -------- | ----------- | 
| File_A.dtx | Hasi    | Rabbit   | yes         | 
| File_A.dtx | Birdi   | Bird     | no          | 
| File_A.dtx | Cathi   | Cat      | yes         | 
| File_B.dtx | Diddi   | Dog      | yes         | 
| File_B.dtx | Eli     | Elephant | yes         | 

房屋:

| Filename   | Street       | Number   | 
| ---------- | ------------ | -------- | 
| File_A.dtx | Main Street  | 32       | 
| File_A.dtx | Buchengasse  | 11       | 
| File_B.dtx | Strauchweg   | 13       | 
| File_B.dtx | Igelallee    | 22       | 

解决方案应该能够并行工作。它可以按文件工作,因为每个文件都很小(大约3 MB(,但我有很多。

非常感谢你的提示。

我现在只有:

from  pyspark.sql.functions import input_file_name
df1 = spark.read.text(filelist).withColumn("Filename", input_file_name())

现在我的主要问题是,如何根据行## Animal## House拆分数据帧,并将其再次聚合为数据帧以完成任务?

假设您知道前一手的格式,并且没有两个数据帧具有相同的列数。然后您可以执行以下操作:

  1. 从数据集中删除注释(行以#开头(
  2. 从数据集中删除标题行
  3. 删除空行
  4. 使用,拆分线路
  5. 在步骤4中,从df创建animals_df作为行的子集,其中来自拆分的数组的大小等于3,并将数组值提取为列
  6. 在步骤4中,从df创建house_df作为行的子集,其中来自拆分的数组的大小等于2,并将数组值提取为列
from  pyspark.sql.functions import element_at, input_file_name, length, col as c, split, size
filelist = ["File_A.dtx", "File_B.dtx"]
df1 = spark.read.text(filelist).withColumn("Filename", input_file_name())
# STEP 1
comment_removed = df1.filter(~(c("value").startswith("#")))
# STEP 2
header_removed = comment_removed.filter(~(c("value").isin("Name, Type, isMammal", "Street, Number")))
# STEP 3
remove_empty_lines = header_removed.filter(length("value") > 0)
# STEP 4
processed_df = remove_empty_lines.withColumn("value", split("value", ",")).withColumn("Filename", element_at(split("Filename", "/"), -1)).cache()
# STEP 5
animals_df = processed_df.filter(size("value") == 3).selectExpr("Filename", "value[0] as Name", "value[1] as Type", "value[2] as isMammal")
animals_df.show()
"""
+----------+-----+---------+--------+
|  Filename| Name|     Type|isMammal|
+----------+-----+---------+--------+
|File_A.dtx| Hasi|   Rabbit|     yes|
|File_A.dtx|Birdi|     Bird|      no|
|File_A.dtx|Cathi|      Cat|     yes|
|File_B.dtx|Diddi|      Dog|     yes|
|File_B.dtx|  Eli| Elephant|     yes|
+----------+-----+---------+--------+
"""
# STEP 6
house_df = processed_df.filter(size("value") == 2).selectExpr("Filename", "value[0] as Street", "cast(value[1] as int) as Number")
house_df.show()
"""
+----------+-----------+------+
|  Filename|     Street|Number|
+----------+-----------+------+
|File_A.dtx|Main Street|    32|
|File_A.dtx|Buchengasse|    11|
|File_B.dtx| Strauchweg|    13|
|File_B.dtx|  Igelallee|    22|
+----------+-----------+------+
"""

最新更新