我正在使用 Dask 在多个数据集上使用 Snorkel 应用标签功能,但这似乎需要很长时间。这正常吗?



我的问题如下:我有几个数据集(900K, 1M7和1M7条目)在csv格式,我加载到多个任务数据框。然后我将它们全部连接在一个任务数据框中,我可以将其提供给我的Snorkel应用程序,它将一堆标签函数应用于我的数据框的每一行,并返回一个numpy数组,其中包含数据框中的行数和标签函数中的列数。

当我使用3个数据集(超过2天…)时,对Snorkel Applier的调用似乎要花很长时间。但是,如果我只使用第一个数据集运行代码,则调用大约需要2小时。当然,我不做连接步骤。

所以我想知道这是怎么回事?我应该更改连接的Dataframe中的分区数量吗?或者也许我一开始就没有把任务用好?

下面是我使用的代码:
from snorkel.labeling.apply.dask import DaskLFApplier
import dask.dataframe as dd
import numpy as np
import os
start = time.time()
applier = DaskLFApplier(lfs)  # lfs are the function that are going to be applied, one of them featurize one of the column of my Dataframe and apply a sklearn classifier (I put n_jobs to None when loading the model)
# If I have only one CSV to read
if isinstance(PATH_TO_CSV, str):
training_data = dd.read_csv(PATH_TO_CSV, lineterminator=os.linesep, na_filter=False, dtype={'size': 'int32'})
slices = None

# If I have several CSV  
elif isinstance(PATH_TO_CSV, list):
training_data_list = [dd.read_csv(path, lineterminator=os.linesep, na_filter=False, dtype={'size': 'int32'}) for path in PATH_TO_CSV]
training_data = dd.concat(training_data_list, axis=0)
# some useful things I do to know where to slice my final result and be sure I can assign each part to each dataset
df_sizes = [len(df) for df in training_data_list]
cut_idx = np.insert(np.cumsum(df_sizes), 0, 0)
slices = list(zip(cut_idx[:-1], cut_idx[1:]))
# The call that lasts forever: I tested all the code above without that line on my 3 datasets and it runs perfectly fine
L_train = applier.apply(training_data)
end = time.time()
print('Time elapsed: {}'.format(timedelta(seconds=end-start)))

如果你需要更多的信息,我会尽我所能让他们给你。提前感谢你的帮助:)

似乎默认情况下applier函数正在使用进程,因此无法从您可能拥有的额外工作中获益:

# add this to the beginning of your code
from dask.distributed import Client
client = Client()
# you can see the address of the client by typing `client` and opening the dashboard
# skipping your other code
# you need to pass the client explicitly to the applier
# after launching this open the dashboard and watch the workers work :)
L_train = applier.apply(training_data, scheduler=client)

相关内容

最新更新