当python函数比它们更快时,为什么我们要使用pyspark UDF?(注意.不用担心spark SQL命令)



我有一个数据帧:

df = (spark
.range(0, 10 * 1000 * 1000)
.withColumn('id', (col('id') / 1000).cast('integer'))
.withColumn('v', rand()))

输出:

+---+-------------------+
| id|                  v|
+---+-------------------+
|  0|0.05011803459635367|
|  0| 0.6749337782428327|
|  0| 0.9449105904567048|
|  0| 0.9183605955607251|
|  0|  0.648596393346793|
+---+-------------------+

现在,可以通过SQL函数和UDF对"v"进行简单的加1操作。

如果我们忽略SQL(最佳性能(

我们可以创建一个UDF作为:

@udf("double")
def plus_one(v):
return v + 1

并称之为

df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

时间:16.5秒

但我的问题是:

如果我使用udf并直接写入:

def plus_one(v):
return v + 1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

耗时-352ms

简言之,UDF查询大约需要16秒,而正常的python函数需要大约350ms的

相比之下,

df.selectExpr("id", "v+1 as v").agg(count(col('v'))).show()

时间:347ms

这是我的困境:

如果我可以用一个正常的python函数执行相同的场景,该函数的性能与内置函数相比。。。

Q。为什么我们不直接使用python函数呢?

Q。注册UDF是否只在我们计划像命令一样在SQL中使用它时才重要?

我们不这么做肯定有一些优化的原因…或者可能和星火集群的工作方式有关?

[已经回答了两个问题,但这两个问题都以"SQL内置函数是首选…"结束;我正在比较python函数和UDF,以及它在pyspark应用程序中的可行性。]

编辑:我也用pandas_udf做到了这一点:

@pandas_udf('double')
def vectorized_plus_one(v):
return v + 1
df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()

时间:5.26秒

我附上了一张截图:

将1添加到值的输出-Python函数(独立(、UDF、SQL

您的场景之所以有效,是因为实际上您没有在python中添加1,而是在Java中添加的,其方式与使用SQL时使用的方式非常相似。

让我们分开来看:

  1. 执行plus_one(df.v),这等于刚刚通过df.v + 1
  2. 尝试在您喜欢的repl中键入df.v + 1,您会看到它返回类型为Column的对象
  3. 怎么可能呢?Column类覆盖了__radd__魔术方法(以及其他一些方法(,并返回新的Column实例,其中包含向指定列添加1的指令

总之:withColumn总是接受Column类型的对象作为第二个参数,在列中添加1的技巧就是python的魔力。

这就是为什么它比udfvectorized udf工作得更快:它们需要运行python进程,串行化/反序列化数据(矢量化udf可以更快地使用arrow来避免串行化/反串行化(,在较慢的python进程中计算。

最新更新