Scala spark扁平嵌套模式包含数组



我有一个嵌套模式,其中包含数组:

 root
  |-- alarm_time: string (nullable = true)
  |-- alarm_id: string (nullable = true)
  |-- user: struct (nullable = true)
  |    |-- name: string (nullable = true)
  |    |-- family: string (nullable = true)
  |    |-- address: struct (nullable = true)
  |    |    |-- postalcode: string (nullable = true)
  |    |    |-- line1: string (nullable = true)
  |    |    |-- city: string (nullable = true)
  |    |    |-- country: string (nullable = true) 
  |-- device: struct (nullable = true)
  |    |-- device_usage: string (nullable = true)
  |    |-- device_id: string (nullable = true)  
  |-- alarm_info: struct (nullable = true)
  |    |-- type: string (nullable = true)
  |    |-- reason: string (nullable = true)
  |    |-- data: struct (nullable = true)
  |    |    |-- alarm_severity: long (nullable = true)
  |    |    |-- extra_info: array (nullable = true)
  |    |    |    |-- element: struct (containsNull = true)
  |    |    |    |    |-- producer: string (nullable = true)
  |    |    |    |    |-- comment: string (nullable = true)

我曾经忽略数组字段,并使用此代码将我的架构弄平:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)
    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName))
    }
  })
}

并像df.select(flattenSchema(df.schema):_*)一样使用它,但是现在我有一个用例也需要保留数组数据,我唯一能想到的是爆炸 the数组并保持多行,但我没有运气。由于我将列作为args参数传递,所以我无法通过另一个参数。

如何实现此目标(用爆炸的数组使模式平坦(?

am1rr3za,如果我们在同一级别上有两个数组,则提供的解决方案将破坏。它不会同时允许两次爆炸:"每个选择子句只允许一个生成器,但发现2:爆炸(_1(,爆炸(_2("

我已经更新了解决方案,以跟踪嵌套中的复杂类型

  def flattenDataFrame(df: DataFrame): DataFrame = {
var flattenedDf: DataFrame = df
if (isNested(df)) {
  val flattenedSchema: Array[(Column, Boolean)] = flattenSchema(df.schema)
  var simpleColumns: List[Column] = List.empty[Column]
  var complexColumns: List[Column] = List.empty[Column]
  flattenedSchema.foreach {
    case (col, isComplex) => {
      if (isComplex) {
        complexColumns = complexColumns :+ col
      } else {
        simpleColumns = simpleColumns :+ col
      }
    }
  }
  var crossJoinedDataFrame = df.select(simpleColumns: _*)
  complexColumns.foreach(col => {
    crossJoinedDataFrame = crossJoinedDataFrame.crossJoin(df.select(col))
    crossJoinedDataFrame = flattenDataFrame(crossJoinedDataFrame)
  })
  crossJoinedDataFrame
} else {
  flattenedDf
}

}

private def flattenSchema(schema: StructType, prefix: String = null): Array[(Column, Boolean)] = {
schema.fields.flatMap(field => {
  val columnName = if (prefix == null) field.name else prefix + "." + field.name
  field.dataType match {
    case arrayType: ArrayType => {
      val cols: Array[(Column, Boolean)] = Array[(Column, Boolean)](((explode_outer(col(columnName)).as(columnName.replace(".", "_"))), true))
      cols
    }
    case structType: StructType => {
      flattenSchema(structType, columnName)
    }
    case _ => {
      val columnNameWithUnderscores = columnName.replace(".", "_")
      val metadata = new MetadataBuilder().putString("encoding", "ZSTD").build()
      Array(((col(columnName).as(columnNameWithUnderscores, metadata)), false))
    }
  }
}).filter(field => field != None)

}

def isNested(df: DataFrame): Boolean = {
df.schema.fields.flatMap(field => {
  field.dataType match {
    case arrayType: ArrayType => {
      Array(true)
    }
    case mapType: MapType => {
      Array(true)
    }
    case structType: StructType => {
      Array(true)
    }
    case _ => {
      Array(false)
    }
  }
}).exists(b => b)

}

因此,我现在正在做的事情( spark 2.2 (是检查模式是否嵌套了,并一遍又一遍地调用 flattenschema直到变平。

  def makeItFlat(df: DataFrame): DataFrame = {
    if (isSchemaNested(df)) {
      val flattenedSchema = flattenSchema(df.schema)
      makeItFlat(df.select(flattenedSchema: _*))
    }
    else {
      df
    }
  }

makeitFlat((是一种递归方法

  def isSchemaNested(df: DataFrame): Boolean = {
    df.schema.fields.flatMap(field => {
      field.dataType match {
        case arrayType: ArrayType => {
          Array(true)
        }
        case mapType: MapType => {
          Array(true)
        }
        case structType: StructType => {
          Array(true)
        }
        case _ => {
          Array(false)
        }
      }
    }).exists(b => b)
  }

IsscheManested的工作是检查模式的Defenition中是否有嵌套数据类型

  private def flattenSchema(schema: StructType, prefix: String = null): Array[Column] = {
    schema.fields.flatMap(field => {
      val columnName = if (prefix == null) field.name else prefix + "." + field.name
      field.dataType match {
        case arrayType: ArrayType => {
          Array[Column](explode_outer(col(columnName)).as(columnName.replace(".", "_")))
        }
        case mapType: MapType => {
          None
        }
        case structType: StructType => {
          flattenSchema(structType, columnName)
        }
        case _ => {
          val columnNameWithUnderscores = columnName.replace(".", "_")
          val metadata = new MetadataBuilder().putString("encoding", "ZSTD").build()
          Array(col(columnName).as(columnNameWithUnderscores, metadata))
        }
      }
    }).filter(field => field != None)
  }

最新更新