pyspark仅当列存在时才使用column表达式


%python
def has_column(df, col):
try:
df[col]
return True
except AnalysisException:
return False

df = spark.createDataFrame([ 
("C","I"), 
("I","I"), 
("C","B"), 
], ["B2B","E1J"])
df.show()

+---+---+
|B2B|E1J|
+---+---+
|  C|  I|
|  I|  I|
|  C|  B|
+---+---+

现在我想做的是:检查一列是否存在,并且仅当它存在时,然后检查它的值,并在此基础上为标志列赋值。只要在有效列上进行检查,这就可以正常工作,如下所示

df.withColumn("flag",when( ((lit(has_column(df, "B2B"))) & (col("B2B")=="C") ) , 1).otherwise(0)).show()

+---+---+----+
|B2B|E1J|flag|
+---+---+----+
|  C|  I|   1|
|  I|  I|   0|
|  C|  B|   1|
+---+---+----+

我遇到的问题是,这些检查条件不是静态的,而是从外部文件中读取并动态生成的,它可能有实际数据帧没有的列,并导致如下错误。

有什么解决办法吗?

例如:

df.withColumn("flag", 
when( 
(lit(has_column(df, "GBC"))) & (col("GBC")=="C") |   
(lit(has_column(df, "B2B"))) & (col("B2B")=="C")     
, 1))   
.otherwise(0).show()

org.apache.spark.sql.AnalysisException: cannot resolve '`GBC`' given input columns: [B2B, E1J];;

错误是由col('GBC')引起的。您可以使用以下代码对可能不存在的列进行预测。

import pyspark.sql.functions as F
def for_exist_column(df, col, pre):
if col in df.columns:
return pre(df[col])
else:
return F.lit(False)

df = spark.createDataFrame([ 
("C","I"), 
("I","I"), 
("C","B"), 
], ["B2B","E1J"])
df.show()
df.withColumn("flag",F.when(for_exist_column(df, 'B2B', lambda c: c=='C'), 1).otherwise(0)).show()
df.withColumn("flag", F.when(for_exist_column(df, 'GBC', lambda c: c=='C') | for_exist_column(df, 'B2B', lambda c: c=='C'), 1).otherwise(0)).show()

最新更新