Spark "package.TreeNodeException" Error Python "java.lang.RuntimeException: Couldn't find pythonUDF



我在Databricks上使用pySpark 2.1。

我已经编写了一个UDF,为pyspark数据帧的每一行生成一个唯一的uuid。我正在使用的数据帧相对较小<10000行。永远不要超越这一点。

我知道有一些内置函数激发函数zipWithIndex()zipWithUniqueId()来生成行索引,但有人专门要求我在这个特定的项目中使用uuid。

UDFudf_insert_uuid在小数据集上运行良好,但似乎与内置的spark函数subtract冲突。

导致此错误的原因:

package.TreeNodeException:绑定属性,树:pythonUDF0#104830

驱动程序堆栈错误更深,它还说:

原因:java.lang.RuntimeException:找不到pythonUDF0#104830

这是我在下面运行的代码:

创建一个函数以生成一组unique_id

import pandas
from pyspark.sql.functions import *
from pyspark.sql.types import *
import uuid
#define a python function
def insert_uuid():
user_created_uuid = str( uuid.uuid1() )
return user_created_uuid
#register the python function for use in dataframes
udf_insert_uuid = udf(insert_uuid, StringType())

创建一个包含50个元素的数据帧

import pandas
from pyspark.sql.functions import *
from pyspark.sql.types import *
list_of_numbers = range(1000,1050)
temp_pandasDF = pandas.DataFrame(list_of_numbers, index=None)
sparkDF = (
spark
.createDataFrame(temp_pandasDF, ["data_points"])
.withColumn("labels", when( col("data_points") < 1025, "a" ).otherwise("b"))    #if "values" < 25, then "labels" = "a", else "labels" = "b"
.repartition("labels")
)
sparkDF.createOrReplaceTempView("temp_spark_table")
#add a unique id for each row
#udf works fine in the line of code here
sparkDF = sparkDF.withColumn("id", lit( udf_insert_uuid() ))
sparkDF.show(20, False)

ssparkDF输出:

+-----------+------+------------------------------------+
|data_points|labels|id |
+-----------+------+------------------------------------+ 
|1029 |b |d3bb91e0-9cc8-11e7-9b70-00163e9986ba|
|1030 |b |d3bb95e6-9cc8-11e7-9b70-00163e9986ba|
|1035 |b |d3bb982a-9cc8-11e7-9b70-00163e9986ba|
|1036 |b |d3bb9a50-9cc8-11e7-9b70-00163e9986ba|
|1042 |b |d3bb9c6c-9cc8-11e7-9b70-00163e9986ba|
+-----------+------+------------------------------------+
only showing top 5 rows

使用不同于sparkDF的值创建另一个DF

list_of_numbers = range(1025,1075)
temp_pandasDF = pandas.DataFrame(list_of_numbers, index=None)
new_DF = (
spark
.createDataFrame(temp_pandasDF, ["data_points"])
.withColumn("labels", when( col("data_points") < 1025, "a" ).otherwise("b"))    #if "values" < 25, then "labels" = "a", else "labels" = "b"
.repartition("labels"))
new_DF.show(5, False)

new_DF输出:

+-----------+------+
|data_points|labels|
+-----------+------+
|1029 |b |
|1030 |b |
|1035 |b |
|1036 |b |
|1042 |b | 
+-----------+------+
only showing top 5 rows

将new_DF中的值与spark_DF进行比较

values_not_in_new_DF = (new_DF.subtract(sparkDF.drop("id")))

将uuid添加到udf的每一行并显示它

display(values_not_in_new_DF
.withColumn("id", lit( udf_insert_uuid()))   #add a column of unique uuid's
)

出现以下错误:

package.TreeNodeException:绑定属性,树:pythonUDF0#104830org.apache.spark.sqlcatalyst.errors.package$TreeNodeException:绑定属性,树:pythonUDF0#104830org.apache.spark.sqlcatalyst.errors.package$.attachTree(package.scala:56)org.apache.spark.sqlcatalyst.expressions.BindReferences$$anonfun$BindReferences$1.applyOrElse(BoundAttribute.scala:88)org.apache.spark.sqlcatalyst.expressions.BindReferences$$anonfun$BindReferences$1.applyOrElse(BoundAttribute.scala:87)org.apache.spark.sqlcatalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)org.apache.spark.sqlcatalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)org.apache.spark.sqlcatalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)org.apache.spark.sqlcatalyst.trees.TreeNode.transformDown(TreeNode.scala:267)org.apache.spark.sqlcatalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)org.apache.spark.sqlcatalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)org.apache.spark.sqlcatalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)org.apache.spark.sqlcatalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)org.apache.spark.sqlcatalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)org.apache.spark.sqlcatalyst.trees.TreeNode.transformDown(TreeNode.scala:273)org.apache.spark.sqlcatalyst.trees.TreeNode.transform(TreeNode.scala:257)org.apache.spark.sqlcatalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)org.apache.spark.sql.exexecution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:473)org.apache.spark.sql.exexecution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:472)scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)scala.collection.mutable.RizableArray$class.foreach(ResizableArray.scala:59)位于scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)scala.collection.TraversableLike$class.map(TraversableLik.scala:244)scala.collection.AbstractTraversable.map(Traversable.scala:105)位于org.apache.spark.sql.exexecution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:472)org.apache.spark.sql.exexecution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:610)org.apache.spark.sql.exexecution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)org.apache.spark.sql.exexecution.CodegenSupport$$anonfun$product$1.apply(WholeStageCodegenExec.scala:83)org.apache.spark.sql.exexecution.CodegenSupport$$anonfun$product$1.apply(WholeStageCodegenExec.scala:78)org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperation Scope.scala:151)org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)org.apache.spark.sql.exexecution.CodegenSupport$class.product(WholeStageCodegenExec.scala:78)org.apache.spark.sql.exexecution.aggregate.HashAggregateExec.product(HashAggregateExec.scala:38)org.apache.spark.sql.exexecution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:313)org.apache.spark.sql.exexecution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:354)org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperation Scope.scala:151)org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)org.apache.spark.sql.exexecution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)org.apache.spark.sql.execution.CollectLimitExec.executeCollection(limit.scala:38)org.apache.spark.sql.Dataset.org/apache$spark.sql$Dataset$$collectFromPlan(Dataset.scala:2807)org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132)org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132)org.apache.spark.sql.Dataset$$anonfun$60.apply(Dataset.scala:2791)org.apache.spark.sql.exexecution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87)org.apache.spark.sql.exexecution.SQLExecution$.withFileAccessAudit(SQLExecution.scala:53)org.apache.spark.sql.exexecution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)org.apache.spark.sql.Dataset.withAction(数据集.scala:2790)org.apache.spark.sql.Dataset.head(数据集.scala:2132)org.apache.spark.sql.Dataset.take(数据集.scala:2345)com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAgener.scala:81)位于com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(outputAggregater.scala:42)位于com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBuffer$1.apply(PythonDriverLocal.scala:461)com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBuffer$1.apply(PythonDriverLocal.scala:441)com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:394)com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBuffer(PythonDriverLocal.scala:441)com.databricks.backend.daemon.driver.PythonDriverLocal.com$databricks$backend$daemon$driver$PythonDriverLocal$$outputSuccess(PythonDriverLocal.scala:428)com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl$3.apply(PythonDriverLocal.scala:178)com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl$3.apply(PythonDriverLocal.scala:175)com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:394)com.databricks.backend.demon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:175)com.databricks.backend.demon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:230)com.databricks.backend.demon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:211)com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:173)scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:168)com.databricks.backend.demon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:39)位于com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:206)com.databricks.backend.demon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:39)位于com.databricks.backend.demon.driver.DriverLocal.execute(DriverLocal.scala:211)位于com.databricks.backend.demon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)com.databricks.backend.demon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)scala.util.Try$.apply(Try.scala:161)com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:584)位于com.databricks.backend.demon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:488)位于com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)位于com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348)com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215)java.lang.Thread.run(Thread.java:745)由以下原因引起:java.lang.RuntimeException:在[data_points#104799L,labels#104802]中找不到pythonUDF0#104830位于的scala.sys.package$.error(package.scala:27)org.apache.spark.sqlcatalyst.expressions.BindReferences$$anonfun$BindReferences$1$$anonfon$applyOrElse$1.apply(BoundAttribute.scala:94)org.apache.spark.sqlcatalyst.expressions.BindReferences$$anonfun$BindReferences$1$$anonfon$applyOrElse$1.apply(BoundAttribute.scala:88)org.apache.spark.sqlcatalyst.errors.package$.attachTree(package.scala:52)…82更多

运行脚本时,我会遇到与您相同的错误。我发现让它工作的唯一方法是向UDF传递一列,而不是不传递参数:

def insert_uuid(col):
user_created_uuid = str( uuid.uuid1() )
return user_created_uuid
udf_insert_uuid = udf(insert_uuid, StringType())

然后在labels上调用它,例如:

values_not_in_new_DF
.withColumn("id", udf_insert_uuid("labels"))
.show()

无需使用lit

相关内容

  • 没有找到相关文章

最新更新