将自定义功能应用于dataframe的数组类型的列



我有一个带有名为'counts'的列的数据框,我想将自定义函数" do_something"应用于列的每个元素,这意味着每个数组。我不想修改数据框,我只想对列计数进行单独的操作。列的所有阵列具有相同的大小。

+----------------------+---------------------------------------+
|id|              counts|
+----------------------+---------------------------------------+
|1|          [8.0, 2.0, 3.0|
|2|          [1.0, 6.0, 3.0|                
+----------------------+---------------------------------------+

当我尝试此操作时:

df.select('counts').rdd.foreach(lambda x: do_something(x))

即使我在没有lambda的情况下尝试也会出现相同的错误。

它在上面的线上失败

py4jjavaerror追溯(最新通话 last)in() ----> 1 df.Select('counts')。rdd.foreach(lambda x:do_something(x))

/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in foreach(self,f) 745 F(x) 746返回iter([]) -> 747 self.mappartitions(ProcessPartition).count()#力量评估 748 749 def foreach部分(self,f):

/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in Count(self)1002 3 1003" -> 1004返回self.mappartitions(lambda i:[sum(1 for _ in i)])。

/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in sum(self) 993 6.0 994" -> 995返回self.mappartitions(lambda x:[sum(x)])。fold(0,operator.add) 996 997 def计数(self):

/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py Zerovalue,OP) 提供给每个分区的867#Zerovalue是从提供的独特的 868#到最后的减少电话 -> 869 vals = self.mappartitions(func).collect() 870返回减少(OP,Vals,Zerovalue) 871

/usr/hdp/2.5.3.0-37/spark/python/pyspark/rdd.py in Collect(self) 769" 770与sccallsitessync(self.context)作为CSS: -> 771端口= self.ctx._jvm.pythonrdd.collectandserve(self._jrdd.rdd()) 772返回列表(_load_from_socket(端口,self._jrdd_deserializer)) 773

/USR/hdp/2.5.3.0-37/spark/python/lib/py4j-0.9-src.zip.zip/py4j/java_gateway.pypy 在 call (self, *args)中 811答案= self.gateway_client.send_command(命令) 812 return_value = get_return_value( -> 813答案,self.gateway_client,self.target_id,self.name) 814 temp_args中的temp_arg的815:

/usr/hdp/2.5.3.0-37/spark/python/pyspark/sql/sql/utils.py in Deco(*a, ** KW) 43 def deco(*a,** kw): 44尝试: ---> 45返回f(*a,** kw) 46除py4j.protocol.py4jjavaerror为e: 47 s = e.java_exception.tostring()

/usr/hdp/2.5.3.0-37/spark/python/lib/py4j-0.9-src.zip.zip/py4j/protocol.py.py 在get_return_value中 306提高py4jjavaerror( 307"调用{0} {1} {2}。 n"时发生错误。 -> 308格式(target_id,"。",名称),值) 309其他: 310提高py4jerror(

尽管所有输入阵列的大小相同。

big_list=[]
def do_something(i_array):
    outputs = custom_library(i_array) # takes as input an array and returns 3 new lists
    big_list.extend(outputs)

您的 UDF修改了一个python对象,即:

  • 数据帧的外观,即使函数奏效,您也无法访问该值,因为您没有将其返回到数据框的行
  • 巨大,它的元素的元素至少是数据框中的行数量的三倍。

您可以尝试这样做:

def do_something(i_array):
    outputs = custom_library(i_array)
    return outputs
import pyspark.sql.functions as psf
do_something_udf = psf.udf(do_something, ArrayType(ArrayType(DoubleType()))

DoubleType()或您返回的任何类型

df.withColumn("outputs", psf.explode(do_something_udf("count")))

您的行是df

的三倍

相关内容

  • 没有找到相关文章

最新更新