用户定义函数的输入参数是什么类型来接受带有数组的嵌套 JSON 结构?



我有以下嵌套的json对象( cellsDF(描述多边形(因此使用scala和spark(:

root
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- Cell: string (nullable = true)
 |    |-- SignalStyrka: long (nullable = true)
 |-- type: string (nullable = true)

示例行是

{ "type": "Feature", "properties": { "SignalStyrka": -82, "Cell": " 112" }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 1292600.0, 6246350.0 ], [ 1292600.0, 6246400.0 ], [ 1292550.0, 6246400.0 ], [ 1292550.0, 6246350.0 ], [ 1292600.0, 6246350.0 ] ] ] } }

我想找到包括给定点的多边形。我已经在Scala中写了UDF来找到这些,但似乎Spark不喜欢我想在此嵌套的JSON上运行UDF的方式。请注意,尚未写入杂质的UDF,只是想测试整个概念是否与UDF一起使用。事先定义了候选器,并且代码的那一部分正在起作用。我已经尝试了一些建议(例如爆炸(而没有成功。我设法与Python合作,但对表演不满意。任何支持都将不胜感激。

val cellsDF = spark.read.json("s3n://coverage-vectors/20170509/*.json.gz")
cellsDF: org.apache.spark.sql.DataFrame = [geometry: struct<coordinates: array<array<array<string>>>, type: string>, properties: struct<Cell: string, SignalStyrka: bigint> ... 1 more field]
def isCandidate(cell: String): Boolean = {
    candidateCells contains cell
}
def inPolygon(coordinates: Array[Array[Array[String]]]): Boolean = {
    coordinates.isEmpty
}
import org.apache.spark.sql.functions.udf
val udfCandidate = udf(isCandidate _)
val udfInPolygon = udf(inPolygon _)
cellsDF.filter(udfCandidate($"properties.Cell")).filter(udfInPolygon($"geometry.coordinates")).count()
isCandidate: (cell: String)Boolean
inPolygon: (coordinates: Array[Array[Array[String]]])Boolean
import org.apache.spark.sql.functions.udf
udfCandidate: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(StringType)))
udfInPolygon: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(ArrayType(ArrayType(ArrayType(StringType,true),true),true))))
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure: Lost task 0.3 in stage 69.0 (TID 179, ip-172-31-12-172.eu-west-1.compute.internal, executor 3): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<array<array<string>>>) => boolean)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [[[Ljava.lang.String;
    at $anonfun$1.apply(<console>:36)
    ... 13 more

更改inPolygon的签名以接受Seq[Seq[Seq[String]]],您完成了。

scala> in.printSchema
root
 |-- id: integer (nullable = false)
 |-- coordinates: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: string (containsNull = true)
val myUDF = udf { coordinates: Seq[Seq[Seq[String]]] => 1 }
scala> in.select(myUDF($"coordinates")).show
+----------------+
|UDF(coordinates)|
+----------------+
|               1|
+----------------+

您可以使用ArrayConverter看到转换,显然可以在非常低的catalysttypeconverters.getConverterfortype中看到。

最新更新