com.datastax.spark.connector.types.TypeConversionException:不



我有一个Spark DataFrame以下模式。

root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- attributes: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)

我正在给Cassandra表写同样的内容。

Cassandra表模式如下:

create table provision_bmss.bmss_cust (
partition_key text,
row_key text,
group int,
attributes map<text,text>,
data_as_of_date text,
PRIMARY KEY (partition_key, row_key, group)
)
WITH cdc = 'FALSE'
AND default_time_to_live = '34560000';

我使用Spark Datastax连接器在以下逻辑之后写入表:

val maxItem = 65000

dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "keyspace" -> keySpace, "table" -> tableName ))
.save()

我得到以下错误:

com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Map(cli_rel_typ_c_00001 -> 01) of type class scala.collection.immutable.Map$Map1 to (AnyRef, AnyRef)

我认为这与代码中的.agg(collect_list(map($"key", $"value")).as("attributes"))行有关。

这里,Map中的所有元素都是<String, String>类型

我不能解决同样的问题。谁来帮帮忙。

输出DataFrame模式如下(这不是预期的):

root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- group: int (nullable = true)
|-- attributes: array (nullable = true)
|    |-- element: map (containsNull = true)
|    |    |-- key: string
|    |    |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)

预期输出DataFrame模式如下:

root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- group: int (nullable = true)
|-- attributes: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)

这个问题的前置问题是根据Map类型列的大小将Spark DataFrame行划分为多行

我参考了这篇文章-如何使用groupBy收集行到一个映射?

我可以使用flatten和toMap函数做同样的事情。

更新工作代码如下:

val joinMap = udf {
values: Seq[Map[String,String]] => values.flatten.toMap
}
def writeToCassandra(dataFrame: DataFrame, keySpace: String, tableName: String) = {

val maxItem = 65000

val dfPreFinal = 
dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")

val dfFinal = dfPreFinal.withColumn("attributes", joinMap(col("attributes")))

.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "keyspace" -> keySpace, "table" -> tableName ))
.save() 
}

是否有更好的方法(可能没有UDF)?

相关内容

  • 没有找到相关文章

最新更新