我有一个数据帧:
df = (spark
.range(0, 10 * 1000 * 1000)
.withColumn('id', (col('id') / 1000).cast('integer'))
.withColumn('v', rand()))
输出:
+---+-------------------+
| id| v|
+---+-------------------+
| 0|0.05011803459635367|
| 0| 0.6749337782428327|
| 0| 0.9449105904567048|
| 0| 0.9183605955607251|
| 0| 0.648596393346793|
+---+-------------------+
现在,可以通过SQL函数和UDF对"v"进行简单的加1操作。
如果我们忽略SQL(最佳性能(
我们可以创建一个UDF作为:
@udf("double")
def plus_one(v):
return v + 1
并称之为
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
时间:16.5秒
但我的问题是:
如果我不使用udf并直接写入:
def plus_one(v):
return v + 1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
耗时-352ms
简言之,UDF查询大约需要16秒,而正常的python函数需要大约350ms的
相比之下,
df.selectExpr("id", "v+1 as v").agg(count(col('v'))).show()
时间:347ms
这是我的困境:
如果我可以用一个正常的python函数执行相同的场景,该函数的性能与内置函数相比。。。
Q。为什么我们不直接使用python函数呢?
Q。注册UDF是否只在我们计划像命令一样在SQL中使用它时才重要?
我们不这么做肯定有一些优化的原因…或者可能和星火集群的工作方式有关?
[已经回答了两个问题,但这两个问题都以"SQL内置函数是首选…"结束;我正在比较python函数和UDF,以及它在pyspark应用程序中的可行性。]
编辑:我也用pandas_udf做到了这一点:
@pandas_udf('double')
def vectorized_plus_one(v):
return v + 1
df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()
时间:5.26秒
我附上了一张截图:
将1添加到值的输出-Python函数(独立(、UDF、SQL
您的场景之所以有效,是因为实际上您没有在python中添加1,而是在Java中添加的,其方式与使用SQL时使用的方式非常相似。
让我们分开来看:
- 执行
plus_one(df.v)
,这等于刚刚通过df.v + 1
- 尝试在您喜欢的repl中键入
df.v + 1
,您会看到它返回类型为Column
的对象 - 怎么可能呢?
Column
类覆盖了__radd__
魔术方法(以及其他一些方法(,并返回新的Column
实例,其中包含向指定列添加1的指令
总之:withColumn
总是接受Column
类型的对象作为第二个参数,在列中添加1的技巧就是python的魔力。
这就是为什么它比udf
和vectorized udf
工作得更快:它们需要运行python进程,串行化/反序列化数据(矢量化udf可以更快地使用arrow
来避免串行化/反串行化(,在较慢的python进程中计算。