在DASK并行化中如何使用广播变量



我有一些代码在dask袋上应用地图功能。我需要一个查找字典来应用该功能,并且它不适用于客户端。

我不知道我是否在做正确的事情,因为工人开始了,但是他们什么也没做。我尝试了不同的配置来寻找不同的示例,但是我无法正常工作。任何支持都将不胜感激。

我知道从Spark中知道,您定义了一个广播变量,然后通过变量访问内容。在要应用的函数内部的值。我看不到Dask。

# Function to map
def transform_contacts_add_to_historic_sin(data,historic_dict):
    raw_buffer = ''
    line = json.loads(data)
    if line['timestamp] > historic_dict['timestamp]:
        raw_buffer = raw_buffer + line['vid']
    return raw_buffer
# main program
# historic_dict is a dictionary previously filled, which is the lookup variable for map function
# file_records will be a list of json.dump getting from a S3 file
from distributed import Client
client = Client()
historic_dict_scattered = client.scatter(historic_dict, broadcast=True)
file_records = []
raw_data = s3_procedure.read_raw_file(... S3 file.......)
data = TextIOWrapper(raw_data)
for line in data:
   file_records.append(line)
bag_chunk = db.from_sequence(file_records, npartitions=16)
bag_transform = bag_chunk.map(lambda x: transform_contacts_add_to_historic(x), args=[historic_dict_scattered])
bag_transform.compute()

如果您的字典很小,则可以直接包含

def func(partition, d):
    return ...
my_dict = {...}
b = b.map(func, d=my_dict)

如果它很大,那么您可能需要将其包装在dask中,延迟

my_dict = dask.delayed(my_dict)
b = b.map(func, d=my_dict)

如果它很大,那么是的,您可能想先散布它(尽管如果事情与上面的任何一种方法奏效,我会避免这种情况(。

[my_dict] = client.scatter([my_dict])
b = b.map(func, d=my_dict)

最新更新