我有以下数据:
data = [
[
"2022-12-12",
["IND", "u1", [["auction_1", [[1,20], [2,12]]], ["auction_2", [[1,5], [2,7]]]]],
],
[
"2022-12-12",
["USA", "u2", [["auction_1", [[1,8], [2,12]]], ["auction_2", [[1,11], [2,4]]]]],
],
]
我有以下模式
actionSchema = T.StructType([
T.StructField("amountSpent", T.LongType()),
T.StructField("timeSpent", T.LongType())
])
actionsSchema = T.StructType(
[T.StructField("action1", actionSchema), T.StructField("action2", actionSchema)]
)
userSchema = T.ArrayType(
T.StructType(
[
T.StructField("refferalId", T.StringType()),
T.StructField("actions", actionsSchema),
]
)
)
dataSchema = T.StructType(
[
T.StructField("country", T.StringType()),
T.StructField("userId", T.StringType()),
T.StructField("users", userSchema),
]
)
schema = T.StructType(
[T.StructField("date", T.StringType()), T.StructField("data", dataSchema)]
)
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
# it has the following schema
root
|-- date: string (nullable = true)
|-- data: struct (nullable = true)
| |-- country: string (nullable = true)
| |-- userId: string (nullable = true)
| |-- users: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- refferalId: string (nullable = true)
| | | |-- actions: struct (nullable = true)
| | | | |-- action1: struct (nullable = true)
| | | | | |-- amountSpent: long (nullable = true)
| | | | | |-- timeSpent: long (nullable = true)
| | | | |-- action2: struct (nullable = true)
| | | | | |-- amountSpent: long (nullable = true)
| | | | | |-- timeSpent: long (nullable = true)
我想要以下格式的数据以便进一步分析。
date, country, userId, refferalId, action, amountSpent, timeSpent
2022-12-31, IND, 123, 123213, action1, 5, 56
display(df.select(F.explode("data")))
# cannot resolve 'explode(data)' due to data type mismatch: input to function explode should be an array or map type
任何帮助都将非常感激。
如果我们不能爆炸任何StructType
,我如何实现上述数据格式?
我也做了这些问题,但没有得到太多帮助->在Spark
中展开struct列时出错你必须爆炸数据。用户:
df.select('date', 'data.country', 'data.userId', F.explode('data.users').alias('info'))
对于操作,您需要像下面这样的查询(在展开data.users之后):
.select('date', 'country', 'userId', 'info.refferalId', F.explode('actions').alias('actionInfo'))
但是因为你将actions定义为struct,所以它不能被分解。如果您将其模式更改为列表,则代码将正常工作
这基本上是一个任务,您需要将一堆数据转换为您想要的表单。你需要一个pyspark.sql.functions
的组合来得到你想要的。
如果我们从你的df
开始:
output = df.select("date", "data.country", "data.userId", explode(col("data.users")).alias("users"))
.select("date", "country", "userId", "users.*")
.withColumn("actions", explode(array(
struct("actions.action1.*", lit("action1").alias("action")),
struct("actions.action2.*", lit("action2").alias("action"))
)
))
.select("date", "country", "userId", "refferalId", "actions.*")
output.printSchema()
root
|-- date: string (nullable = true)
|-- country: string (nullable = true)
|-- userId: string (nullable = true)
|-- refferalId: string (nullable = true)
|-- amountSpent: long (nullable = true)
|-- timeSpent: long (nullable = true)
|-- action: string (nullable = false)
output.show()
+----------+-------+------+----------+-----------+---------+-------+
| date|country|userId|refferalId|amountSpent|timeSpent| action|
+----------+-------+------+----------+-----------+---------+-------+
|2022-12-12| IND| u1| auction_1| 1| 20|action1|
|2022-12-12| IND| u1| auction_1| 2| 12|action2|
|2022-12-12| IND| u1| auction_2| 1| 5|action1|
|2022-12-12| IND| u1| auction_2| 2| 7|action2|
|2022-12-12| USA| u2| auction_1| 1| 8|action1|
|2022-12-12| USA| u2| auction_1| 2| 12|action2|
|2022-12-12| USA| u2| auction_2| 1| 11|action1|
|2022-12-12| USA| u2| auction_2| 2| 4|action2|
+----------+-------+------+----------+-----------+---------+-------+
操作,每个转换的转换:
- 第一个
select
语句打开data
结构体并打开data.users
数组 第二, - 第三,
withColumn
语句有点复杂。此时,我们有两个结构体(action1
和action2
)具有相同的模式。我们在这里做的是:- 为
actions
列添加action1
或action2
值的文字列action
- 使用
array
函数 将这两个相似的列放入数组中 - 爆炸数组
第四,select语句用来打开我们创建的 - 为
select
语句展开users
结构体actions
结构体希望这对你有帮助!
问题是不能爆炸结构。你只能爆炸数组或地图。您需要采取的第一步是引爆data.users
(而不仅仅是数据)。你可以这样做:
users = df
.withColumn("s", F.explode("data.users"))
.select("date", "data.country", "data.userId", "s.*")
users.show()
+----------+-------+------+----------+------------------+
| date|country|userId|refferalId| actions|
+----------+-------+------+----------+------------------+
|2022-12-12| IND| u1| auction_1|{{1, 20}, {2, 12}}|
|2022-12-12| IND| u1| auction_2| {{1, 5}, {2, 7}}|
|2022-12-12| USA| u2| auction_1| {{1, 8}, {2, 12}}|
|2022-12-12| USA| u2| auction_2| {{1, 11}, {2, 4}}|
+----------+-------+------+----------+------------------+
从这里开始,你想要分解动作,但是和以前一样,你不能分解结构。为了克服这个问题,可以将其转换为结构体数组。
users
.withColumn("actions", F.array(
[ F.struct(
F.lit(f"action{i}").alias("action"),
F.col("actions")[f"action{i}"].alias("meta")
) for i in [1, 2] ]
))
.withColumn("action", F.explode("actions"))
.select("date", "country", "userId", "refferalId", "action.action", "action.meta.*")
.show()
+----------+-------+------+----------+-------+-----------+---------+
| date|country|userId|refferalId| action|amountSpent|timeSpent|
+----------+-------+------+----------+-------+-----------+---------+
|2022-12-12| IND| u1| auction_1|action1| 1| 20|
|2022-12-12| IND| u1| auction_1|action2| 2| 12|
|2022-12-12| IND| u1| auction_2|action1| 1| 5|
|2022-12-12| IND| u1| auction_2|action2| 2| 7|
|2022-12-12| USA| u2| auction_1|action1| 1| 8|
|2022-12-12| USA| u2| auction_1|action2| 2| 12|
|2022-12-12| USA| u2| auction_2|action1| 1| 11|
|2022-12-12| USA| u2| auction_2|action2| 2| 4|
+----------+-------+------+----------+-------+-----------+---------+