比较 Spark 中的两个架构(列名 + 可为空)



我知道如何使用zip+forall比较 Scala 中的两个列表。

我的问题是我们如何比较两种DataFrame模式。也就是说,我们希望将列名与其可为 null 的属性相匹配。

我的想法是使用哈希映射来存储{列名:可为空},并进行比较。我想它有效,但是还有其他惯用方法吗?

首先,你应该检索你想要比较的元素,正如Tom Lous在他的回答中所说:

val s1 = df1.schema.fields.map(f => (f.name, f.nullable))
val s2 = df2.schema.fields.map(f => (f.name, f.nullable))

然后你可以使用 List 中的diff方法,它将返回差异,如果该方法返回并空列表,则没有差异,否则有:

s1.diff(s2).isEmpty

返回:如果未找到差异,则为 true,否则为 false。

请考虑,当字段存在于一个列表中但不存在于另一个列表中时,diff 方法不返回任何差异。因此,您可能需要附加第二个条件来比较长度

s1.diff(s2).isEmpty && s1.length == s2.length

只有当你确定列和顺序的数量相同时,你才能使用zip&forall。还是我错过了什么?

无论如何,我想这将是这样做的方法:

val s1 = df1.schema.fields.map(f => (f.name, f.nullable))
val s2 = df2.schema.fields.map(f => (f.name, f.nullable))
val res = s1 zip s2 forall {
case ((f1, n1), (f2,n2)) => f1 == f2 && n1 ==  n2
}

您甚至可以对s1s2进行排序以确保字段名称对齐,但这仍然感觉很棘手。

否则我会使用diff方法。需要注意的是,这是一个片面的观点。df2 中确实存在的缺失字段将被忽略

val res = (s1 diff s2).length == 0

如果需要比较两个模式,原则上,你只需要像往常一样比较它们:

val equal: Boolean = (s1 == s2)

但是,既然你在问,我想你遇到了某种涉及nullable的麻烦,如果你使用镶木地板,这是很常见的。我解释:

当您使用 Parquet 时,尽管架构存在于 Parquet 文件中,但当您读取 Parquet 文件时,Spark 会忽略nullable,将它们全部设置为true。这样,在您写下 Parquet 文件之前,该文件的架构可能与您在读取同一 Parquet 文件之后的架构不完全匹配。这不是错误,它是记录在案的功能。

在下面的示例中,我递归比较两个架构,忽略了nullable设置。

从技术上讲,这不是您所要求的,但是您可以轻松调整代码。只需调整标有//-- comparison的行。

package object my.spark.utilities.schema {
import org.apache.spark.sql.types.StructType
implicit class StructTypeExtension(schema: StructType) {
import org.apache.spark.sql.types.StructField 
implicit def similar(other: StructType): Boolean = _similar(schema, other)
private final def _similar(_this: StructType, _other: StructType): Boolean =
if(_this.fields.size != _other.fields.size) false
else
(_this.fields zip _other.fields)
.forall { case (t, o) => _similar(t, o) }
private final def _similar(_this: StructField, _other: StructField): Boolean =
if((_this.dataType.typeName == "struct") && (_other.dataType.typeName == "struct"))
_similar(_this.dataType.asInstanceOf[StructType], _other.dataType.asInstanceOf[StructType])
else
(_this.name == _other.name) && (_this.dataType == _other.dataType) //-- comparison
}
}

然后查找架构是否相似:

import my.spark.utilities.schema._
val similar: Boolean = s1.similar(s2)

最新更新