通过比较同一数据帧中两个不同列之间的数据,填充pyspark数据帧中的一列



目标:实现此查询

select *, 
case when new_x != x or new_y != y then 'some_status_change' else cdc_status end as cdc_status
from dataframe where cdc_status = 'noUpdateRequired'

我正在尝试使用pyspark(3.0.0(和spark(2.4.4(来实现这个逻辑,我现在有这个

df = df.withColumn("cdc_status",
F.when(((F.col('cdc_status') == 'noUpdateRequired')
& (F.col('new_autoapproveind') != F.col('autoapproveind')
| F.col('new_preferpathway') != F.col('preferpathway'))), 'pathwayChange'))

但这给我带来了以下错误

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions

因此,基本上我需要一个能够更新列cdc_status的解决方案,其中new_x != x or new_y != y where cdc_status = 'noUpdateRequired'

df.printSchema()
root
|-- new_autoapproveind: string (nullable = true)
|-- new_preferpathway: string (nullable = true)
|-- autoapproveind: string (nullable = true)
|-- preferpathway: string (nullable = true)
|-- cdc_status: string (nullable = true)

我在打印模式时删除了一些列,因为它们本质上是敏感的,但本质上它们也都是基于字符串的列。

我试着到处搜索,但在pyspark中找不到相同的解决方案。scala=!=运算符,但没有pyspark运算符。

虽然我在其他情况下可以使用when函数,但这里有F.col('cdc_status') != 'some value'),它是一个静态值,但这里我需要在列之间进行比较,然后在cdc_status列中填充/更新值。

任何帮助都将不胜感激!

您的条件中缺少括号,这导致了错误。也就是说,您的Python代码可能不等同于SQL查询。您应该在添加新列之前进行筛选,还应该添加otherwise子句。例如

import pyspark.sql.functions as F
df2 = df.filter("cdc_status = 'noUpdateRequired'").withColumn(
'cdc_status',
F.when(
(F.col('new_autoapproveind') != F.col('autoapproveind'))
| 
(F.col('new_preferpathway') != F.col('preferpathway')),
'some_status_change'
).otherwise(
F.col('cdc_status')
)
)

最新更新