错误: 只能对具有兼容列类型的表执行联合。struct(tier:string,skyward_number:string,skyward_points:string( <> struct(skyward_number:string,tier:string,skyward_points:string(在第二个表的第一列;
在这里,结构字段的顺序不同,但其余一切都相同。
数据帧 1 架构
root
|-- emcg_uuid: string (nullable = true)
|-- name: string (nullable = true)
|-- phone_no: string (nullable = true)
|-- dob: string (nullable = true)
|-- country: string (nullable = true)
|-- travel_type: string (nullable = true)
|-- gdpr_restricted_flg: string (nullable = false)
|-- gdpr_reason_code: string (nullable = false)
|-- document: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- skyward: struct (nullable = false)
| |-- tier: string (nullable = false)
| |-- skyward_number: string (nullable = false)
| |-- skyward_points: string (nullable = false)
dataframe2 schema
root
|-- emcg_uuid: string (nullable = true)
|-- name: string (nullable = true)
|-- phone_no: string (nullable = true)
|-- dob: string (nullable = true)
|-- country: string (nullable = true)
|-- travel_type: string (nullable = true)
|-- gdpr_restricted_flg: string (nullable = true)
|-- gdpr_reason_code: string (nullable = true)
|-- document: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- skyward: struct (nullable = false)
| |-- skyward_number: string (nullable = false)
| |-- tier: string (nullable = false)
| |-- skyward_points: string (nullable = false)
如何解决这个问题?
union
的默认 Spark 行为是标准 SQL 行为,因此按位置匹配。这意味着,两个数据帧中的架构必须包含相同的字段,并且具有相同的字段,并且顺序相同。
如果要按名称匹配架构,请使用 Spark 2.3 中引入的unionByName
。
您还可以重新映射字段:
val df1 = ...
val df2 = /...
df1.toDF(df2.columns: _*).union(df2)
编辑:我现在看到了编辑。
您可以再次添加这些列:
import org.apache.spark.sql.functions._
val withCorrectedStruct = df1.withColumn("skyward", struct($"skyward_number", $"tier", $"skyward_points"))
//preserves the order the columns while doing union
def getStructRecursiveDataFrame(df1 : DataFrame, df2 : DataFrame,columns : Array[String]) : DataFrame = {
if(columns.isEmpty) {
df2
}
else {
println("test")
val col_name = columns.head
val col_schema = df1.schema.fields.find(_.name == col_name).get
if(col_schema.dataType.typeName.equals("struct")){
println("test1")
val updatedStructNames: Seq[Column] = col_schema.dataType.asInstanceOf[StructType].fieldNames.map(name => col(col_name+"." + name))
getStructRecursiveDataFrame(df1,df2.withColumn(col_name, struct(updatedStructNames: _*)),columns.tail)
}
else{ getStructRecursiveDataFrame(df1,df2,columns.tail)}
}
}
def unionByName(a: org.apache.spark.sql.DataFrame, b: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame = {
val b_new_df = getStructRecursiveDataFrame(a,b,a.columns)
val columns_seq = a.columns.toSet.intersect(b_new_df.columns.toSet).map(col).toSeq
a.select(columns_seq: _*).union(b_new_df.select(columns_seq: _*))
}
结果
[INFO] DATAFRAME-1 SCHEME
root
|-- emcg_uuid: string (nullable = true)
|-- name: string (nullable = true)
|-- phone_no: string (nullable = true)
|-- dob: string (nullable = true)
|-- country: string (nullable = true)
|-- travel_type: string (nullable = true)
|-- gdpr_restricted_flg: string (nullable = false)
|-- gdpr_reason_code: string (nullable = false)
|-- document: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- skyward: struct (nullable = false)
| |-- tier: string (nullable = false)
| |-- skyward_number: string (nullable = false)
| |-- skyward_points: string (nullable = false)
[INFO] DATAFRAME-2 SCHEME
root
|-- emcg_uuid: string (nullable = true)
|-- name: string (nullable = true)
|-- phone_no: string (nullable = true)
|-- dob: string (nullable = true)
|-- country: string (nullable = true)
|-- travel_type: string (nullable = true)
|-- gdpr_restricted_flg: string (nullable = true)
|-- gdpr_reason_code: string (nullable = true)
|-- document: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- skyward: struct (nullable = false)
| |-- skyward_number: string (nullable = false)
| |-- tier: string (nullable = false)
| |-- skyward_points: string (nullable = false)
[INFO] DATAFRAME SCHEME AFTER THE UNION
root
|-- skyward: struct (nullable = false)
| |-- skyward_number: string (nullable = false)
| |-- tier: string (nullable = false)
| |-- skyward_points: string (nullable = false)
|-- name: string (nullable = true)
|-- document: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- phone_no: string (nullable = true)
|-- travel_type: string (nullable = true)
|-- gdpr_restricted_flg: string (nullable = true)
|-- dob: string (nullable = true)
|-- gdpr_reason_code: string (nullable = true)
|-- country: string (nullable = true)
|-- emcg_uuid: string (nullable = true)
[INFO] TEST CASE FOR ANONYMIZATION VALIDATION
[INFO] INPUT DATA
+----+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|name|phone_no |travel_type|gdpr_restricted_flg|dob |gdpr_reason_code|country|emcg_uuid|document |skyward |
+----+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|ravi|8747436090|freq | |1988-05-28| |dubai |uuid_1 |Map(document_type -> passport, id -> A3343)|[123456,blue,687]|
|aaaa|8747436091|freg | |1988-06-25| |europe |uuid_2 |Map(document_type -> passport, id -> A3341)|[123456,blue,687]|
|bbbb|8747436092|reg | |1988-07-26| |india |uuid_3 |Map(document_type -> passport, id -> A3345)|[123456,blue,687]|
|cccc|8747436093|na | |1988-08-27| |georgia|uuid_4 |Map(document_type -> passport, id -> A3349)|[123456,blue,687]|
|dddd|8747436094|na | |1988-09-29| |swis |uuid_5 |Map(document_type -> passport, id -> B3343)|[123456,blue,687]|
|null|8747436095|freq | |1988-02-30| |us |uuid_6 |Map(document_type -> passport, id -> C3343)|[123456,blue,687]|
|null|8747436096|na | |1988-01-01| |canada |uuid_7 |Map(document_type -> null, id -> D3343) |[123456,blue,687]|
+----+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
[INFO] EXPECTED OUTPUT
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|name |phone_no |travel_type|gdpr_restricted_flg|dob |gdpr_reason_code|country|emcg_uuid|document |skyward |
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|DDDDDDD|9999999 |freq |Y |1988-05-XX|13-001 |XXXXXXX|uuid_1 |Map(document_type -> ZZZZZ, id -> HH343) |[123456,blue,687]|
|aaaa |8747436091|freg | |1988-06-25| |europe |uuid_2 |Map(document_type -> passport, id -> A3341)|[123456,blue,687]|
|DDDDDDD|9999999 |reg |Y |1988-07-XX|13-001 |XXXXXXX|uuid_3 |Map(document_type -> ZZZZZ, id -> HH345) |[123456,blue,687]|
|cccc |8747436093|na | |1988-08-27| |georgia|uuid_4 |Map(document_type -> passport, id -> A3349)|[123456,blue,687]|
|dddd |8747436094|na | |1988-09-29| |swis |uuid_5 |Map(document_type -> passport, id -> B3343)|[123456,blue,687]|
|null |8747436095|freq | |1988-02-30| |us |uuid_6 |Map(document_type -> passport, id -> C3343)|[123456,blue,687]|
|null |9999999 |na |Y |1988-01-XX|13-001 |XXXXXXX|uuid_7 |Map(document_type -> null, id -> HH343) |[123456,blue,687]|
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
[INFO] ACTUAL OUTPUT
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|name |phone_no |travel_type|gdpr_restricted_flg|dob |gdpr_reason_code|country|emcg_uuid|document |skyward |
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|DDDDDDD|9999999 |freq |Y |1988-05-XX|13-001 |XXXXXXX|uuid_1 |Map(document_type -> ZZZZZ, id -> HH343) |[UUUUU,blue,JJ7] |
|aaaa |8747436091|freg | |1988-06-25| |europe |uuid_2 |Map(document_type -> passport, id -> A3341)|[123456,blue,687]|
|DDDDDDD|9999999 |reg |Y |1988-07-XX|13-001 |XXXXXXX|uuid_3 |Map(document_type -> ZZZZZ, id -> HH345) |[UUUUU,blue,JJ7] |
|cccc |8747436093|na | |1988-08-27| |georgia|uuid_4 |Map(document_type -> passport, id -> A3349)|[123456,blue,687]|
|dddd |8747436094|na | |1988-09-29| |swis |uuid_5 |Map(document_type -> passport, id -> B3343)|[123456,blue,687]|
|null |8747436095|freq | |1988-02-30| |us |uuid_6 |Map(document_type -> passport, id -> C3343)|[123456,blue,687]|
|null |9999999 |na |Y |1988-01-XX|13-001 |XXXXXXX|uuid_7 |Map(document_type -> null, id -> HH343) |[UUUUU,blue,JJ7] |
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
如果只有一个字段不同,并且名称已知("skyward"(,则可以解析为:
val data = List(("1", "2", "3"))
val bulkDF = data.toDF("emcg_uuid", "tier", "skyward_number")
// union parts
val tsDF = bulkDF.withColumn("skyward", struct($"tier", $"skyward_number"))
val stDF = bulkDF.withColumn("skyward", struct($"skyward_number", $"tier"))
// change struct "skyward" in last stDF
val schema = tsDF.schema.fields.find(_.name == "skyward").get
val updatedStructNames: Seq[Column] = schema.dataType.asInstanceOf[StructType].fieldNames.map(name => col("skyward." + name))
val withUpdatedSchema = stDF.withColumn("skyward", struct(updatedStructNames: _*))
// union
tsDF.union(withUpdatedSchema).show(false)
对于许多这样的结构字段,只能使用一些循环。
我遇到了同样的问题"错误:只能对具有兼容列类型的表执行联合。
在下面调查了可能导致问题的可能性。
-
数据集 1 和数据集 2 之间的架构不匹配 - 我的情况是匹配。
-
数据集 1 和数据集 2 之间的列排序 - 列排序不匹配导致问题。 为了保持数据集 1 和数据集 2 中的列顺序,请执行以下步骤,
打印数据集 1 和数据集 2 的架构以了解当前列顺序 根据数据集 2 对齐数据集 1 的列,反之亦然。示例: 数据集1(列 2, 列 1( 数据集 2(列 1, 列 1( datset1.select("column1","column2"((.union(dataset2(- 将排列列和问题解决。