我正在加载一个数据从拼花在火花其中一列是二进制类型。我想将此列转换为几何类型,为此我使用Apache Sedona/GeoSpark。我使用st_geomfromwkb
,但得到错误
df = spark.read.parquet("dbfs:/FileStore/tables/geometry.parquet")
df.printSchema()
root
|-- geo_key: string (nullable = true)
|-- STATEFP: string (nullable = true)
|-- geometry: binary (nullable = true)
df.createOrReplaceTempView("geo_cali")
spark.sql("select geo_key, state, st_geomfromwkb(geometry) as geometry from geo_cali").show()
get below Error:
ClassCastException: [B cannot be cast to org.apache.spark.unsafe.types.UTF8String
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 19) (10.139.64.4 executor 0): java.lang.ClassCastException: [B cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKB.eval(Constructors.scala:176)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:81)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:178)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2765)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2712)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2706)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2706)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1255)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1255)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1255)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2914)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2902)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2446)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:289)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:299)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:82)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)
at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75)
at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62)
at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:512)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:511)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:399)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:59)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3018)
at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3009)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3802)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3800)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3008)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:194)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:57)
at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:411)
at com.databricks.backend.daemon.driver.PythonDriverLocal.computeListResultsItem(PythonDriverLocal.scala:839)
at com.databricks.backend.daemon.driver.PythonDriverLocalBase.genListResults(PythonDriverLocalBase.scala:375)
at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:897)
at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:775)
at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:854)
at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:652)
at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:817)
at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$6(PythonDriverLocal.scala:224)
at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:775)
at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:211)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$13(DriverLocal.scala:544)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:240)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:235)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:232)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:53)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:279)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:271)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:53)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:521)
at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689)
at scala.util.Try$.apply(Try.scala:213)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: [B cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKB.eval(Constructors.scala:176)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:81)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:178)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
我能够将列转换为几何类型。由于数据的格式是WKB,所以它是十六进制的。我将列转换为十六进制,然后使用st_geomfromwkb函数。它工作。
df5 = df.withColumn("geometry", hex(col("geometry")))
df5.createOrReplaceTempView("tbl2")
df6 = spark.sql("select geo_key, state, st_geomfromwkb(geometry) as geometry from tbl2")
这个问题已经被Sedona -122修复了。
ST_GeomFromWKB现在重载二进制和十六进制代码:https://sedona.apache.org/setup/release-notes/#sedona-121
你可以直接做ST_GeomFromWKB(二进制)