写pyspark UDF和初始化代码



我想运行一个带有初始化代码的pyspark UDF,每个Python进程只运行一次。我不需要保证这段代码只运行一次,但出于性能考虑,我不希望它对每一行都运行。

在scala中我可以这样做:

object MyUDF {
// This code runs once per JVM (executor), initialize heavy objects
}
class MyUDF extends UDF2[Double, Double, String] {
override def call(lat: Double, long: Double): String = {
// This code runs per record, 
but can use the static objects that are already initialized
}
}

在Python中可能有类似的东西吗?我知道每个执行器都有一个Python进程来响应执行器的UDF调用。

让我们考虑一个简单的示例,其中大对象是一个列表,您的UDF只是检查一个元素是否在该列表中。

一种方法是简单地在UDF定义之外定义对象。因此,初始化代码将只在驱动程序中执行一次。
from pyspark.sql import functions as f
big_list = [1, 2, 4, 6, 8, 10]
is_in_list = f.udf(lambda x: x in big_list)
spark.range(10).withColumn("x", is_in_list(f.col("id"))).show()

收益率:

+---+-----+
| id|    x|
+---+-----+
|  0|false|
|  1| true|
|  2| true|
|  3|false|
|  4| true|
|  5|false|
|  6| true|
|  7|false|
|  8| true|
|  9|false|
+---+-----+

代码的问题对于scala代码,spark将为每个任务提供该对象的副本。如果您关心的只是初始化对象所需的时间,那也没关系。但如果对象太大,可能会影响工作的性能。

要解决这个问题,可以使用广播变量。事实上,根据spark的文档:

广播变量允许程序员在每台机器上缓存一个只读变量,而不是在任务中附带一个副本。

代码将非常相似:

from pyspark.sql import functions as f
big_list = [1, 2, 4, 6, 8, 10]
big_list_bc = sc.broadcast(big_list)
is_in_list = f.udf(lambda x: x in big_list_bc.value)
spark.range(10).withColumn("x", is_in_list(f.col("id"))).show()

pandas_udf的几种类型之一是Iterator[pd.Series]) ->迭代器(pd.Series)。
在映射每个pd之前。你可以有一个init阶段,每个executor运行一次。

在这里可以找到关于这个主题的很好的指南。

@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
<init something>  
for x in batch_iter:
yield <your code here>

最新更新