PySpark array column



我有一个PySpark DataFrame,其中包含一个图书集合,每个图书可以有一个或多个标题。每个标题被归类为原始标题(OT)或备选标题(AT)。为简单起见,我省略了其他标题类型。我的验证需要确保每本书只有一个OT标题,可以有任意数量的AT标题。

我要做的是清理数据,以便:

  • 如果一本书有多个OT标题,保留第一个,其余改为AT
  • 如果一本书没有OT标题,将第一个AT标题更改为OT
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import collect_list, col, struct
data = ([
(1, 'Title 1', 'OT'),
(1, 'Title 2', 'OT'),
(2, 'Title 3', 'AT'),
(2, 'Title 4', 'OT'),
(3, 'Title 5', 'AT'),
])
schema = StructType([ 
StructField("BookID", IntegerType(), False),
StructField("Title", StringType(), True), 
StructField("Type", StringType(), True),
])
df = spark.createDataFrame(data, schema)
df = df.groupby('BookID').agg(collect_list(struct(col('Title'), col('Type'))).alias('Titles'))
display(df)

听起来应该很容易,但是我有点不知道该怎么做。如有任何帮助,我将不胜感激。

我已经尝试使用下面的udf,但到目前为止,这种方法不起作用。我得到一个错误说一个lambda cannot contain assignment.

def process_titles(titles):
x = list(filter(lambda t: t.Type == 'OT', titles))[1::]
map(lambda t: t.Type = 'AT', x)

return x
process_titles_udf = udf(lambda x: process_titles(x), titles)
df = df.withColumn('test', process_titles_udf('Titles'))

,其中udf返回类型为

的对象
titles = ArrayType(StructType([ 
StructField("Title", StringType(), True), 
StructField("Type", StringType(), True)
]))

首先,当您说"保留第一项"时,您必须知道collect_list是不确定的。所以根据你的运行情况,你可能有不同的"第一"不。

如果您想继续这种不确定的行为,下面是您的UDF:

@udf(titles)
def process_titles(titles):
OTs = [x for x in titles if x["Type"] == "OT"]  # Collect all OT types
if OTs:
OT = OTs[0]  # Keep the first OT as only OT if it exists
else:
OT = {
"Title": titles[0]["Title"],
"Type": "OT",
}  # otherwise, use the first AT as OT
ATs = [
{"Title": x["Title"], "Type": "AT"} for x in titles if x["Title"] != OT["Title"]
]  # Transform all other titles as AT
return [OT] + ATs

df.select("titles", process_titles(F.col("Titles"))).show(truncate=False)
+------------------------------+------------------------------+                 
|titles                        |process_titles(Titles)        |
+------------------------------+------------------------------+
|[[Title 1, OT], [Title 2, OT]]|[[Title 1, OT], [Title 2, AT]]|
|[[Title 5, AT]]               |[[Title 5, OT]]               |
|[[Title 3, AT], [Title 4, OT]]|[[Title 4, OT], [Title 3, AT]]|
+------------------------------+------------------------------+

最新更新