NLP节进程中的任务分区或延迟



我正在和Stanza一起处理一个NLP过程。Stanza需要很长时间来运行NLP过程,我知道我的问题是相当可分区的。

我使用这些库

pip install stanza
import stanza
stanza.download('es')
nlp = stanza.Pipeline(lang='es')
import pandas as pd
import dask.dataframe as dd
import dask
import datetime
我有以下函数
data_text = pd.DataFrame({'text': ["hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo.",
"hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo.",
"hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo.",
"hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo.",
"hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo.",
"hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo."]})
def concept_const_func(data_text_inp):
beginning = datetime.datetime.now()
# Data
data_text_func = data_text_inp.reset_index(drop=True)
# Consolidation
df_tw_out = pd.DataFrame({'tw': ["drop"]})
for i in range(0,len(data_text_func)):
# Text
tweet_test = data_text_func["text"][i]
# NLP
doc_review = nlp(tweet_test)
# Principales Definiciones
print(i)
for sent in doc_review.sentences:
for dep in sent.dependencies:
if dep[1] == 'nsubj':
df_tw_aux = pd.DataFrame({"tw" : [dep[0].text + " " + dep[2].text]})
df_tw_out = pd.concat([df_tw_out, df_tw_aux])
ending = datetime.datetime.now()
print(ending-beginning)
return df_tw_out

当我运行带有pandas或dask延迟的代码时,我在执行时间方面得到相同的结果。

# Just Pandas
df_pd = concept_const_func(data_text)
# Dask Delayes
df_dd = dask.delayed(concept_const_func)(data_text)
df_dd.compute()

我也试图用map_partition()来解决它,但不能让它正确工作。主要是因为代码中最耗时的部分是NLP(),我不知道如何为这个需要输入str的进程使用DASK分区。

谁能想到解决这个问题的替代方案(通过划分NLP()来减少代码的执行时间)?

谢谢!

您的问题源于您正在创建一个单分区的任务数据框,它本质上是一个pandas数据框。

您应该将数据读入dask数据框架,并使用npartitions关键字参数创建适当数量的分区,以便dask可以并行化您的函数。

此处不需要延迟。映射分区应该工作良好。

我知道您正在尝试对输入数据帧应用逐行计算。在这种情况下,为了与Dask并行,您应该执行以下操作:

def nlp_apply(row):
# Text
tweet_test = row["text"]
# NLP
doc_review = nlp(tweet_test)
# Principales Definiciones
for sent in doc_review.sentences:
for dep in sent.dependencies:
if dep[1] == 'nsubj':
df_tw_aux = pd.DataFrame({"tw" : [dep[0].text + " " + dep[2].text]})
return df_tw_aux
res = ddf.apply(nlp_apply, axis=1).compute()

nlp_apply函数的返回结果可能有一些需要修改的地方。

为了正确地并行化,您还需要适当地块化任务数据框(使用几个分区,https://docs.dask.org/en/stable/dataframe-create.html),并且您可能需要使用比默认调度程序(https://docs.dask.org/en/stable/scheduling.html)更多的其他任务调度程序。

最新更新