我在Scala中使用Spark 1.6。
我用对象创建了Elasticsearch中的索引。对象" params"是作为映射[字符串,地图[String,String]]创建的。示例:
val params : Map[String, Map[String, String]] = ("p1" -> ("p1_detail" -> "table1"), "p2" -> (("p2_detail" -> "table2"), ("p2_filter" -> "filter2")), "p3" -> ("p3_detail" -> "table3"))
为我提供了如下图的记录:
{
"_index": "x",
"_type": "1",
"_id": "xxxxxxxxxxxx",
"_score": 1,
"_timestamp": 1506537199650,
"_source": {
"a": "toto",
"b": "tata",
"c": "description",
"params": {
"p1": {
"p1_detail": "table1"
},
"p2": {
"p2_detail": "table2",
"p2_filter": "filter2"
},
"p3": {
"p3_detail": "table3"
}
}
}
},
然后,我正在尝试读取Elasticsearch索引以更新值。
Spark用以下模式读取索引:
|-- a: string (nullable = true)
|-- b: string (nullable = true)
|-- c: string (nullable = true)
|-- params: struct (nullable = true)
| |-- p1: struct (nullable = true)
| | |-- p1_detail: string (nullable = true)
| |-- p2: struct (nullable = true)
| | |-- p2_detail: string (nullable = true)
| | |-- p2_filter: string (nullable = true)
| |-- p3: struct (nullable = true)
| | |-- p3_detail: string (nullable = true)
我的问题是该对象被读为结构。为了管理和轻松更新我想拥有地图的字段,因为我对structtype并不熟悉。
我试图将对象作为地图作为地图,但我有以下错误:
User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(params)' due to data type mismatch: argument 1 requires map<string,map<string,string>> type, however, 'params' is of struct<p1:struct<p1_detail:string>,p2:struct<p2_detail:string,p2_filter:string>,p3:struct<p3_detail:string>> type.;
UDF代码段:
val getSubField : Map[String, Map[String, String]] => String = (params : Map[String, Map[String, String]]) => { val return_string = (params ("p1") getOrElse("p1_detail", null.asInstanceOf[String]) return_string }
我的问题:我们如何将此结构转换为地图?我已经看过文档中可用的tomap方法,但由于我是Scala初学者,找不到如何使用它(对隐式参数不太熟悉)。
。预先感谢
我最终解决了以下内容:
def convertRowToMap[T](row: Row): Map[String, T] = {
row.schema.fieldNames
.filter(field => !row.isNullAt(row.fieldIndex(field)))
.map(field => field -> row.getAs[T](field))
.toMap
}
/* udf that converts Row to Map */
val rowToMap: Row => Map[String, Map[String, String]] = (row: Row) => {
val mapTemp = convertRowToMap[Row](row)
val mapToReturn = mapTemp.map { case (k, v) => k -> convertRowToMap[String](v) }
mapToReturn
}
val udfrowToMap = udf(rowToMap)
您无法将param类型指定为structType对象,而是将类型指定为行。
//Schema of parameter
def schema:StructType = (new StructType).add("p1", (new StructType).add("p1_detail", StringType))
.add("p2", (new StructType).add("p2_detail", StringType).add("p2_filter",StringType))
.add("p3", (new StructType).add("p3_detail", StringType))
//Not allowed
val extractVal: schema => collection.Map[Nothing, Nothing] = _.getMap(0)
解决方案:
// UDF example to process struct column
val extractVal: (Row) => collection.Map[Nothing, Nothing] = _.getMap(0)
// You would implement something similar
val getSubField : Map[String, Map[String, String]] => String =
(params : Row) =>
{
val p1 = params.getAs[Row]("p1")
.........
return null;
}
我希望这会有所帮助!