PySpark数据帧添加列(如果不存在)



我在各种JSON文件中都有JSON数据,而键可能会有所不同,例如

{"a":1 , "b":"abc", "c":"abc2", "d":"abc3"}
{"a":1 , "b":"abc2", "d":"abc"}
{"a":1 ,"b":"abc", "c":"abc2", "d":"abc3"}

我想在" b"列,'c','d'和'f'列上进行aggreagate数据,这些数据在给定的JSON文件中不存在,但可以存在于其他文件中。因此,由于不存在列" f",我们可以为该列取空字符串。

我正在读取输入文件并像这样汇总数据

import pyspark.sql.functions as f
df =  spark.read.json(inputfile)
df2 =df.groupby("b","c","d","f").agg(f.sum(df["a"]))

这是我想要的最终输出

{"a":2 , "b":"abc", "c":"abc2", "d":"abc3","f":"" }
{"a":1 , "b":"abc2", "c":"" ,"d":"abc","f":""}

有人可以帮忙吗?预先感谢!

您可以在数据框架中检查COLUM是否可用,并在必要时仅修改df

if 'f' not in df.columns:
   df = df.withColumn('f', f.lit(''))

对于嵌套模式,您可能需要使用df.schema如下:

>>> df.printSchema()
root
 |-- a: struct (nullable = true)
 |    |-- b: long (nullable = true)
>>> 'b' in df.schema['a'].dataType.names
True
>>> 'x' in df.schema['a'].dataType.names
False

以防万一有人在scala中需要它:

if (!df.columns.contains("f")) {
  val newDf = df.withColumn("f", lit(""))
}

这个功能对我而言。

    def detect_data(column, df, data_type):
          if not column in df.columns:
            ret = lit(None).cast(data_type)
          else:
            ret = col(column).cast(data_type)
            
          return ret
    df = df.withColumn('f', detect_data('f', df, StringType()))

这是一个火花函数,您可以在 df.transform(f)

中使用它
def addMissingColumn(
      colName: String,
      defaultColumn: Column = lit(null).cast(StringType)
    ): DataFrame => DataFrame = { df =>
      val noInfoPresent = !df.columns.toSeq.contains(colName)
      val dfUpdated = if (noInfoPresent) {
        df.withColumn(colName, defaultColumn)
      } else { df }
      dfUpdated
    }

相关内容

  • 没有找到相关文章

最新更新