Pyspark映射调用的函数不会修改全局列表



我已经定义了这个函数,它在全局列表签名上运行,我已经测试了这个函数并且它可以工作。

def add_to_list_initial(x):
global signature
signature.append([x])
print(x)
return x

打印将检查是否调用了该函数。

我必须为Pyspark rdd的每一行运行这个函数,所以我写了这样的代码:

rdd.map(lambda x: min([str(int.from_bytes(hash_functions[0](str(shingle)), 'big')) for shingle in x])).map(lambda x: add_to_list_initial(x))

但是函数没有被调用,所以为了避免map的"懒惰",我尝试在末尾添加".count((",这样:

rdd.map(lambda x: min([str(int.from_bytes(hash_functions[0](str(shingle)), 'big')) for shingle in x])).map(lambda x: add_to_list_initial(x)).count()

现在打印完成了。我甚至检查了列表签名是否已更新,但当我尝试打印列表大小时,结果将为0,因为列表根本没有更新。

我甚至尝试使用foreach而不是map,但结果是一样的:

rdd1 = rdd.map(lambda x: min([str(int.from_bytes(hash_functions[0](str(shingle)), 'big')) for shingle in x]))
rdd1.foreach(add_to_list_initial)

这些是输出的第一行,在我的Pycharm控制台上用红色写着,甚至还有打印:

19/11/19 21:56:51 WARN TaskSetManager: Stage 2 contains a task of very large size (76414 KB). The maximum recommended task size is 100 KB.
1000052032941703168135263382785614272239884872602
1001548144792848500380180424836160638323674923493
1001192257270049214326810337735024900266705408878
1005273115771118475643621392239203192516851021236
100392090499199786517408984837575190060861208673
1001304115299775295352319010425102201971454728176
1009952688729976061710890304226612996334789156125
1001064097828097404652846404629529563217707288121
1001774517560471388799843553771453069473894089066
1001111820875570611167329779043376285257015448116
1001339474866718130058118603277141156508303423308
1003194269601172112216983411469283303300285500716
1003194269601172112216983411469283303300285500716
1003194269601172112216983411469283303300285500716
1003194269601172112216983411469283303300285500716
1003194269601172112216983411469283303300285500716

如何以有效的方式解决问题?我使用Python 3.7和Pyspark 3.2.1

我这样做是为了获得每组哈希瓦片的最小哈希签名,其中文档的id是

然后,为了计算其他排列,我认为应该这样做:

def add_to_list(x):
global num_announcements
global signature
global i
print(len(signature))
if i == num_announcements:
i = 0
signature[i].append(x)
print(i)
i += 1

for function in hash_functions[1:]:
rdd.map(lambda x: min([str(int.from_bytes(function(str(shingle)), 'big')) for shingle in x])).foreach(add_to_list)

但问题是一样的。我甚至很乐意为我的minhashing问题提出建议,但问题是关于上面描述的问题。

我用这种方式解决了问题,即使我没有找到一个有用的解决方案。

signatures = shingles.flatMap(lambda x: [[(x[1]+1, (x[1]+1)%lsh_b), min([int.from_bytes(function(str(s)), 'big') for s in x[0]])] for function in hash_functions]).cache()

您可以使用一个类(一个可调用(来代替全局变量。

例如:

from collections.abc import Callable

class Signature(Callable):
def __init__(self):
self.signature = []
def __call__(self, x):
self.signature.append([x])
return x

然后,您可以在需要的地方安装此可调用文件:

add_to_list_initial = Signature()
rdd.map(lambda x: min([str(int.from_bytes(hash_functions[0](str(shingle)), 'big')) for shingle in x])).map(
lambda x: add_to_list_initial(x)
).count()
print(add_to_list_initial.signature)

注意:您可以在这里避免lambda表达式,使用进行简化

rdd.map(lambda x: min([str(int.from_bytes(hash_functions[0](str(shingle)), 'big')) for shingle in x])).map(
add_to_list_initial
).count()

编辑

为了允许酸洗,您可以使用:

class Signature:
def __init__(self):
self.signature = []
def __call__(self, x):
self.signature.append([x])
return x

相关内容

  • 没有找到相关文章

最新更新