使用python在spark中进行架构验证


Validate_shema(df, dic)
Df2=df.withcolumn('typ_freq',when(df.schema.["Frequency"].dataType != dic["Frequency"], False). Otherwise ('true')
Df2=df.withcolumn('typ_region',when(df.schema.["Region"].dataType != dic["Region"], False). Otherwise ('true')
Df2.show()

它给了我错误-条件必须是一列。

尽管如此,当我尝试验证长度时,比如-df.withcolumn("len_freq",当(length(df["freq"](>dic["freq],False(。否则(True(这成功地起到了作用。

有人能告诉解决方案为什么数据类型1不起作用吗?

对于spark中的模式验证,我建议使用Cerberus库(https://docs.python-cerberus.org/en/stable/)-有一个关于使用Cerberus with Spark的精彩教程:https://www.waitingforcode.com/apache-spark/validating-json-apache-spark-cerberus/read

根据当前解决方案不起作用的原因,您需要转换条件以处理列类型,可能需要使用lit函数(https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.lit)-类似于:

import pyspark.sql.functions as F
df = df.withColumn("data_type", F.lit(df.schema.["Frequency"].dataType))
df = df.withcolumn('typ_freq',F.when(F.col("data_type") != dic["Frequency"], False).otherwise('true')

祝你好运!

相关内容

  • 没有找到相关文章

最新更新