目标:实现此查询
select *,
case when new_x != x or new_y != y then 'some_status_change' else cdc_status end as cdc_status
from dataframe where cdc_status = 'noUpdateRequired'
我正在尝试使用pyspark(3.0.0(和spark(2.4.4(来实现这个逻辑,我现在有这个
df = df.withColumn("cdc_status",
F.when(((F.col('cdc_status') == 'noUpdateRequired')
& (F.col('new_autoapproveind') != F.col('autoapproveind')
| F.col('new_preferpathway') != F.col('preferpathway'))), 'pathwayChange'))
但这给我带来了以下错误
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions
因此,基本上我需要一个能够更新列cdc_status
的解决方案,其中new_x != x or new_y != y where cdc_status = 'noUpdateRequired'
df.printSchema()
root
|-- new_autoapproveind: string (nullable = true)
|-- new_preferpathway: string (nullable = true)
|-- autoapproveind: string (nullable = true)
|-- preferpathway: string (nullable = true)
|-- cdc_status: string (nullable = true)
我在打印模式时删除了一些列,因为它们本质上是敏感的,但本质上它们也都是基于字符串的列。
我试着到处搜索,但在pyspark
中找不到相同的解决方案。scala
有=!=
运算符,但没有pyspark
运算符。
虽然我在其他情况下可以使用when
函数,但这里有F.col('cdc_status') != 'some value')
,它是一个静态值,但这里我需要在列之间进行比较,然后在cdc_status
列中填充/更新值。
任何帮助都将不胜感激!
您的条件中缺少括号,这导致了错误。也就是说,您的Python代码可能不等同于SQL查询。您应该在添加新列之前进行筛选,还应该添加otherwise
子句。例如
import pyspark.sql.functions as F
df2 = df.filter("cdc_status = 'noUpdateRequired'").withColumn(
'cdc_status',
F.when(
(F.col('new_autoapproveind') != F.col('autoapproveind'))
|
(F.col('new_preferpathway') != F.col('preferpathway')),
'some_status_change'
).otherwise(
F.col('cdc_status')
)
)