Pandas应用程序不在Spark并行代码中工作



我试图在并行化的代码中使用Pandas"apply",但"application"根本不起作用。在使用Spark(在RDD上并行化(时,我们可以在分发给执行器的代码中使用"apply"吗?

代码:

def testApply(k):
return pd.DataFrame({'col1':k,'col2':[k*2]*5})
def testExec(x):
df=pd.DataFrame({'col1':range(0,10)})
ddf=pd.DataFrame(columns=['col1', 'col2'])
##In my case the below line doesn't get executed at all
res= df.apply(lambda row: testApply(row.pblkGroup) if row.pblkGroup%2==0 else pd.DataFrame(), axis=1)
list1=[1,2,3,4]
sc=SparkContext.getOrCreate()
testRdd= sc.parallelize(list1)
output=testRdd.map(lambda x: testExec(x)).collect()

要在Spark中使用Pandas,有两个选项:-

使用闭包

Spark最困难的事情之一是在集群中执行代码时了解变量和方法的范围和生命周期。在其作用域之外修改变量的RDD操作经常会引起混淆。在下面的例子中,我们将研究使用foreach((来增加计数器的代码,但其他操作也可能出现类似的问题。

更多详细信息可在此处找到[1]

示例

import numpy as np
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4'])
spk_df.show()
B = [2,0,1,0] 
V = [5,1,2,4]
def V_sum(row,b,c):
return float(np.sum(c[row==b]))
v_sum_udf = F.udf(lambda row: V_sum(row, B, V), FloatType())    
spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))))

详细信息可在此处找到[2]

使用Pandas UDF

对于Spark 2.4.4,有一个开箱即用的方法可以将Pandas与Spark一起使用。详细信息可与示例[3]一起在此处找到

1-http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-闭包-2-pyspark数据帧上的自定义功能3-https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html

版本低于0.21的Pandas似乎不支持此功能。我已经升级了Pandas版本,它运行良好。

我也得到了相同的错误(TypeError:需要一个整数(得到类型字节(

from pyspark.context import SparkContext
TypeError                                 Traceback (most recent call last)
~AppDataLocalTemp/ipykernel_11288/3937779276.py in <module>
----> 1 from pyspark.context import SparkContext
~miniconda3libsite-packagespyspark__init__.py in <module>
49 
50 from pyspark.conf import SparkConf
---> 51 from pyspark.context import SparkContext
52 from pyspark.rdd import RDD, RDDBarrier
53 from pyspark.files import SparkFiles
~miniconda3libsite-packagespysparkcontext.py in <module>
29 from py4j.protocol import Py4JError
30 
---> 31 from pyspark import accumulators
32 from pyspark.accumulators import Accumulator
33 from pyspark.broadcast import Broadcast, BroadcastPickleRegistry
~miniconda3libsite-packagespysparkaccumulators.py in <module>
95     import socketserver as SocketServer
96 import threading
---> 97 from pyspark.serializers import read_int, PickleSerializer
98 
99 
~miniconda3libsite-packagespysparkserializers.py in <module>
69     xrange = range
70 
---> 71 from pyspark import cloudpickle
72 from pyspark.util import _exception_message
73 
~miniconda3libsite-packagespysparkcloudpickle.py in <module>
143 
144 
--> 145 _cell_set_template_code = _make_cell_set_template_code()
146 
147 
~miniconda3libsite-packagespysparkcloudpickle.py in _make_cell_set_template_code()
124         )
125     else:
--> 126         return types.CodeType(
127             co.co_argcount,
128             co.co_kwonlyargcount,
TypeError: an integer is required (got type bytes

)

相关内容

最新更新