我有一个PySpark DataFrame,其中包含一个图书集合,每个图书可以有一个或多个标题。每个标题被归类为原始标题(OT
)或备选标题(AT
)。为简单起见,我省略了其他标题类型。我的验证需要确保每本书只有一个OT
标题,可以有任意数量的AT
标题。
我要做的是清理数据,以便:
- 如果一本书有多个
OT
标题,保留第一个,其余改为AT
- 如果一本书没有
OT
标题,将第一个AT
标题更改为OT
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import collect_list, col, struct
data = ([
(1, 'Title 1', 'OT'),
(1, 'Title 2', 'OT'),
(2, 'Title 3', 'AT'),
(2, 'Title 4', 'OT'),
(3, 'Title 5', 'AT'),
])
schema = StructType([
StructField("BookID", IntegerType(), False),
StructField("Title", StringType(), True),
StructField("Type", StringType(), True),
])
df = spark.createDataFrame(data, schema)
df = df.groupby('BookID').agg(collect_list(struct(col('Title'), col('Type'))).alias('Titles'))
display(df)
听起来应该很容易,但是我有点不知道该怎么做。如有任何帮助,我将不胜感激。
我已经尝试使用下面的udf,但到目前为止,这种方法不起作用。我得到一个错误说一个lambda cannot contain assignment
.
def process_titles(titles):
x = list(filter(lambda t: t.Type == 'OT', titles))[1::]
map(lambda t: t.Type = 'AT', x)
return x
process_titles_udf = udf(lambda x: process_titles(x), titles)
df = df.withColumn('test', process_titles_udf('Titles'))
,其中udf
返回类型为
titles = ArrayType(StructType([
StructField("Title", StringType(), True),
StructField("Type", StringType(), True)
]))
首先,当您说"保留第一项"时,您必须知道collect_list
是不确定的。所以根据你的运行情况,你可能有不同的"第一"不。
如果您想继续这种不确定的行为,下面是您的UDF:
@udf(titles)
def process_titles(titles):
OTs = [x for x in titles if x["Type"] == "OT"] # Collect all OT types
if OTs:
OT = OTs[0] # Keep the first OT as only OT if it exists
else:
OT = {
"Title": titles[0]["Title"],
"Type": "OT",
} # otherwise, use the first AT as OT
ATs = [
{"Title": x["Title"], "Type": "AT"} for x in titles if x["Title"] != OT["Title"]
] # Transform all other titles as AT
return [OT] + ATs
df.select("titles", process_titles(F.col("Titles"))).show(truncate=False)
+------------------------------+------------------------------+
|titles |process_titles(Titles) |
+------------------------------+------------------------------+
|[[Title 1, OT], [Title 2, OT]]|[[Title 1, OT], [Title 2, AT]]|
|[[Title 5, AT]] |[[Title 5, OT]] |
|[[Title 3, AT], [Title 4, OT]]|[[Title 4, OT], [Title 3, AT]]|
+------------------------------+------------------------------+