PySpark嵌套数据帧



我需要一些Python Pyspark的帮助。我有一个源对象,看起来像:编辑:两个列表都是numpy数据类型的numpy数组

obj = [("thing1", ([1,2,3], [0.1,0.2,0.3]),
("thing2", ([1,2,3], [0.1,0.2,0.3]),
("thing3", ([1,2,3], [0.1,0.2,0.3]),
("thing4", ([1,2,3], [0.1,0.2,0.3]),
...]

有没有一种方法可以创建这样的spark数据帧。在";列2";本质上是标记为"0"的两列的另一个数据帧;标签1";以及";标号2":

"column1" | "column2"
---------------------
"thing1"  | [{"label1": 1, "label2": 0.1}, {"label1": 2, "label2": 0.2}, {"label1": 3, "label2": 0.3}]
"thing2"  | [{"label1": 1, "label2": 0.1}, {"label1": 2, "label2": 0.2}}]
...

我的最终目标是生成一个如下所示的JSON对象。我需要列和标签:

{{"column1":"thing1", 
"column2":[{"1abel1":1, "label2":0.1},{"1abel1":2, "label2":0.2},{"1abel1":3, "label2":0.3},
{"column1":"thing2", 
"column2":[{"1abel1":1, "label2":0.1},{"1abel1":2, "label2":0.2},{"1abel1":3, "label2":0.3},
{"column1":"thing3", 
"column2":[{"1abel1":1, "label2":0.1},{"1abel1":2, "label2":0.2},{"1abel1":3, "label2":0.3},
...}

如果这可以相对快速地处理大约100万条记录,那么也太棒了

您可以尝试以下非基于udf的方法:

方法1

你可以尝试使用transform来实现这一点,因为你总是期望相同数量的标签1和标签2,否则你总是可以修改以下内容来执行检查,例如

transformed_df = (
df.withColumn(
"column3",
F.transform(
"column2.label2",
lambda entry,index: F.struct(
F.col("column2.label1")[index].alias("label1"),
F.lit(entry).alias("label2")
)
)
)
)
transformed_df.show(truncate=False)
transformed_df.printSchema()
+-------+----------------------------+------------------------------+
|column1|column2                     |column3                       |
+-------+----------------------------+------------------------------+
|thing1 |{[1, 2, 3], [0.1, 0.2, 0.3]}|[{1, 0.1}, {2, 0.2}, {3, 0.3}]|
|thing2 |{[1, 2, 3], [0.1, 0.2, 0.3]}|[{1, 0.1}, {2, 0.2}, {3, 0.3}]|
|thing3 |{[1, 2, 3], [0.1, 0.2, 0.3]}|[{1, 0.1}, {2, 0.2}, {3, 0.3}]|
|thing4 |{[1, 2, 3], [0.1, 0.2, 0.3]}|[{1, 0.1}, {2, 0.2}, {3, 0.3}]|
+-------+----------------------------+------------------------------+
root
|-- column1: string (nullable = true)
|-- column2: struct (nullable = true)
|    |-- label1: array (nullable = true)
|    |    |-- element: integer (containsNull = true)
|    |-- label2: array (nullable = true)
|    |    |-- element: float (containsNull = true)
|-- column3: array (nullable = true)
|    |-- element: struct (containsNull = false)
|    |    |-- label1: integer (nullable = true)
|    |    |-- label2: float (nullable = true)

方法2

您也可以将您的转换映射到RDD上,例如

from pyspark.sql import Row
transformed_df = df.rdd.map(lambda row: Row(
column1=row['column1'],
column2=[ { 
'label1':entry,
'label2':row['column2']['label2'][index] 
} for index,entry in enumerate(row['column2']['label1']) ]
)).toDF(schema="column1 string, column2 array<struct<label1: int, label2: float>>")
transformed_df.show(truncate=False)
transformed_df.printSchema()
+-------+------------------------------+
|column1|column2                       |
+-------+------------------------------+
|thing1 |[{1, 0.1}, {2, 0.2}, {3, 0.3}]|
|thing2 |[{1, 0.1}, {2, 0.2}, {3, 0.3}]|
|thing3 |[{1, 0.1}, {2, 0.2}, {3, 0.3}]|
|thing4 |[{1, 0.1}, {2, 0.2}, {3, 0.3}]|
+-------+------------------------------+
root
|-- column1: string (nullable = true)
|-- column2: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- label1: integer (nullable = true)
|    |    |-- label2: float (nullable = true)

设置

为了调试的目的,下面包含了重现设置的代码。

from pyspark.sql import functions as F
from pyspark.sql import types as T
data = [
("thing1", ([1,2,3], [0.1,0.2,0.3])),
("thing2", ([1,2,3], [0.1,0.2,0.3])),
("thing3", ([1,2,3], [0.1,0.2,0.3])),
("thing4", ([1,2,3], [0.1,0.2,0.3]))
]
df=spark.createDataFrame(
data,
T.StructType([
T.StructField("column1",T.StringType()),
T.StructField(
"column2",
T.StructType([
T.StructField("label1",T.ArrayType(T.IntegerType())),
T.StructField("label2",T.ArrayType(T.FloatType())),
])
)
])
)
df.show(truncate=False)
+-------+----------------------------+
|column1|column2                     |
+-------+----------------------------+
|thing1 |{[1, 2, 3], [0.1, 0.2, 0.3]}|
|thing2 |{[1, 2, 3], [0.1, 0.2, 0.3]}|
|thing3 |{[1, 2, 3], [0.1, 0.2, 0.3]}|
|thing4 |{[1, 2, 3], [0.1, 0.2, 0.3]}|
+-------+----------------------------+

最新更新