有没有办法为Spark数据框添加额外的元数据?



是否可以添加额外的元数据到DataFrame s?

的原因

我有Spark DataFrame s,我需要保留额外的信息。例如:一个DataFrame,我想"记住"在Integer id列中使用的最高索引。

当前解决方案

我使用单独的DataFrame来存储此信息。当然,单独保存这些信息既繁琐又容易出错。

是否有更好的解决方案来存储DataFrame s上的这些额外信息?

要扩展和scala -fy nealmcb的答案(问题标记为scala,而不是python,所以我不认为这个答案会跑题或多余),假设您有一个DataFrame:

import org.apache.spark.sql
val df = sc.parallelize(Seq.fill(100) { scala.util.Random.nextInt() }).toDF("randInt")

还有一种方法可以让你在DataFrame中记住最大值或其他值:

val randIntMax = df.rdd.map { case sql.Row(randInt: Int) => randInt }.reduce(math.max)

sql.types.Metadata只能保存字符串、布尔值、某些类型的数字和其他元数据结构。所以我们必须使用Long:

val metadata = new sql.types.MetadataBuilder().putLong("columnMax", randIntMax).build()

DataFrame.withColumn()实际上有一个允许在末尾提供元数据参数的重载,但它被莫名其妙地标记为[private],所以我们只是做它所做的-使用Column.as(alias, metadata):

val newColumn = df.col("randInt").as("randInt_withMax", metadata)
val dfWithMax = df.withColumn("randInt_withMax", newColumn)

dfWithMax现在有你想要的元数据了!

dfWithMax.schema.foreach(field => println(s"${field.name}: metadata=${field.metadata}"))
> randInt: metadata={}
> randInt_withMax: metadata={"columnMax":2094414111}

或编程和类型安全(sort of;Metadata.getLong()和其他方法不返回Option,并可能抛出"key not found"异常):

dfWithMax.schema("randInt_withMax").metadata.getLong("columnMax")
> res29: Long = 209341992

将max附加到列在您的情况下是有意义的,但在将元数据附加到DataFrame而不是特定列的一般情况下,似乎您必须采用其他答案所描述的包装器路由。

从Spark 1.2开始,StructType模式有一个metadata属性,它可以保存Dataframe中每个列的任意映射/信息字典。例如(当与单独的spark-csv库一起使用时):

customSchema = StructType([
  StructField("cat_id", IntegerType(), True,
    {'description': "Unique id, primary key"}),
  StructField("cat_title", StringType(), True,
    {'description': "Name of the category, with underscores"}) ])
categoryDumpDF = (sqlContext.read.format('com.databricks.spark.csv')
 .options(header='false')
 .load(csvFilename, schema = customSchema) )
f = categoryDumpDF.schema.fields
["%s (%s): %s" % (t.name, t.dataType, t.metadata) for t in f]
["cat_id (IntegerType): {u'description': u'Unique id, primary key'}",
 "cat_title (StringType): {u'description': u'Name of the category, with underscores.'}"]

这是在[SPARK-3569]中添加到StructField - ASF JIRA的元数据字段,并设计用于机器学习管道,以跟踪存储在列中的特征信息,如分类/连续,数字类别,类别到索引映射。参见SPARK-3569:添加元数据字段到StructField设计文档。

我希望看到它被更广泛地使用,例如用于列的描述和文档,列中使用的度量单位,坐标轴信息等。

问题包括如何在转换列时适当地保留或操作元数据信息,如何处理多种元数据,如何使其全部可扩展等。

对于那些考虑在Spark数据框架中扩展此功能的人来说,我引用了一些关于Pandas的类似讨论。

例如,参见xray -将熊猫的标记数据能力带到支持标记数组元数据的物理科学。

并参见允许自定义元数据附加到panel/df/series中关于Pandas元数据的讨论?·第2485期·pydata/pandas.

请参阅有关单位的讨论:ENH:度量单位/物理量·Issue #10349·pydata/pandas

如果您想减少繁琐的工作,我认为您可以在DataFrame和自定义包装器之间添加隐式转换(虽然尚未测试)。

   implicit class WrappedDataFrame(val df: DataFrame) {
        var metadata = scala.collection.mutable.Map[String, Long]()
        def addToMetaData(key: String, value: Long) {
           metadata += key -> value
        }
     ...[other methods you consider useful, getters, setters, whatever]...
      }

如果隐式包装器在DataFrame的作用域内,你可以直接使用普通的DataFrame,就像它是你的包装器一样,即:

df.addtoMetaData("size", 100)

这种方式还使元数据可变,因此您不应该被迫只计算一次并携带它。

我会在数据框架周围存储一个包装器。例如:

case class MyDFWrapper(dataFrame: DataFrame, metadata: Map[String, Long])
val maxIndex = df1.agg("index" ->"MAX").head.getLong(0)
MyDFWrapper(df1, Map("maxIndex" -> maxIndex))

很多人看到"元数据"这个词就直接想到"列元数据"。这似乎不是你想要的,也不是我想要的,当我有一个类似的问题。最终,这里的问题是DataFrame是一个不可变的数据结构,无论何时对其执行操作,数据都会传递,但DataFrame的其余部分不会。这意味着你不能简单地在它上面放一个包装器,因为一旦你执行一个操作,你就得到了一个全新的DataFrame(可能是一个全新的类型,特别是Scala/Spark倾向于隐式转换)。最后,如果DataFrame转义了它的包装器,就没有办法从DataFrame重构元数据。

我在Spark Streaming中遇到这个问题,它关注RDD (DataFrame的底层数据结构),并得出一个简单的结论:存储元数据的唯一位置是在RDD的名称中。除了报告之外,核心Spark系统永远不会使用RDD名称,因此重新使用它是安全的。然后,您可以基于RDD名称创建包装器,并在任意 DataFrame和包装器之间进行显式转换,并使用元数据完成。

不幸的是,这仍然会给您留下不可变性和每次操作都会创建新rdd的问题。RDD名称(我们的元数据字段)随着每个新的RDD而丢失。这意味着您需要一种方法将名称重新添加到新的RDD中。这可以通过提供一个将函数作为参数的方法来解决。它可以提取函数之前的元数据,调用函数并获得新的RDD/DataFrame,然后用元数据命名它:

def withMetadata(fn: (df: DataFrame) => DataFrame): MetaDataFrame = {
  val meta = df.rdd.name
  val result = fn(wrappedFrame)
  result.rdd.setName(meta)
  MetaDataFrame(result)
}

您的包装类(MetaDataFrame)可以为解析和设置元数据值提供方便的方法,以及在Spark DataFrame和MetaDataFrame之间来回隐式转换。只要您通过withMetadata方法运行所有的突变,您的元数据就会在整个转换管道中继续运行。对每个调用使用这种方法有点麻烦,是的,但简单的现实是,Spark中没有一流的元数据概念。

相关内容

  • 没有找到相关文章

最新更新