我有一个Spark DataFrame以下模式。
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
我正在给Cassandra表写同样的内容。
Cassandra表模式如下:
create table provision_bmss.bmss_cust (
partition_key text,
row_key text,
group int,
attributes map<text,text>,
data_as_of_date text,
PRIMARY KEY (partition_key, row_key, group)
)
WITH cdc = 'FALSE'
AND default_time_to_live = '34560000';
我使用Spark Datastax连接器在以下逻辑之后写入表:
val maxItem = 65000
dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "keyspace" -> keySpace, "table" -> tableName ))
.save()
我得到以下错误:
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Map(cli_rel_typ_c_00001 -> 01) of type class scala.collection.immutable.Map$Map1 to (AnyRef, AnyRef)
我认为这与代码中的.agg(collect_list(map($"key", $"value")).as("attributes"))
行有关。
这里,Map
中的所有元素都是<String, String>
类型
我不能解决同样的问题。谁来帮帮忙。
输出DataFrame模式如下(这不是预期的):
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- group: int (nullable = true)
|-- attributes: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
预期输出DataFrame模式如下:
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- group: int (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
这个问题的前置问题是根据Map类型列的大小将Spark DataFrame行划分为多行
我参考了这篇文章-如何使用groupBy收集行到一个映射?
我可以使用flatten和toMap函数做同样的事情。
更新工作代码如下:
val joinMap = udf {
values: Seq[Map[String,String]] => values.flatten.toMap
}
def writeToCassandra(dataFrame: DataFrame, keySpace: String, tableName: String) = {
val maxItem = 65000
val dfPreFinal =
dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
val dfFinal = dfPreFinal.withColumn("attributes", joinMap(col("attributes")))
.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "keyspace" -> keySpace, "table" -> tableName ))
.save()
}
是否有更好的方法(可能没有UDF)?