用芹菜提交任务的禁食方式?



我正在尝试使用以下代码向芹菜提交大约 1.5 亿个工作岗位:

from celery import chain
from .task_receiver import do_work,handle_results,get_url
urls = '/home/ubuntu/celery_main/urls'
if __name__ == '__main__':
fh = open(urls,'r')
alldat = fh.readlines()
fh.close()
for line in alldat:
try:
result = chain(get_url.s(line[:-1]),do_work.s(line[:-1])).apply_async()
except:
print ("failed to submit job")
print('task submitted ' + str(line[:-1]))

将文件拆分为块并运行此代码的多个实例会更快吗?或者我能做什么?我使用 memcached 作为后端,rabbitmq 作为代理。

import multiprocessing
from celery import chain
from .task_receiver import do_work,handle_results,get_url
urls = '/home/ubuntu/celery_main/urls'
num_workers = 200
def worker(urls,id):
"""worker function"""
for url in urls:
print ("%s - %s" % (id,url))
result = chain(get_url.s(url),do_work.s(url)).apply_async() 
return
if __name__ == '__main__':
fh = open(urls,'r')
alldat = fh.readlines()
fh.close()
jobs = []
stack = []
id = 0
for i in alldat:
if (len(stack) < len(alldat) / num_workers):
stack.append(i[:-1])
continue
else:
id = id + 1
p = multiprocessing.Process(target=worker, args=(stack,id,))
jobs.append(p)
p.start()
stack = []
for j in jobs:
j.join()

如果我正确理解您的问题:

  1. 您有一个 150M 网址的列表
  2. 你想在每个网址上运行get_url((然后do_work((

所以你有两个问题:

  1. 浏览 150M 网址
  2. 对任务进行排队

关于代码中的主 for 循环,是的,如果您使用多线程,特别是如果您使用多核 CPU,您可以更快地做到这一点。您的主线程可以读取该文件并将其块传递给将创建芹菜任务的子线程。

查看指南和文档:

https://realpython.com/intro-to-python-threading/

https://docs.python.org/3/library/threading.html

现在,假设您有 1 个工作人员正在接收这些任务。该代码将生成 150M 个新任务,这些任务将被推送到队列中。每条链将是 get_url(( 和 do_work(( 的链,下一条链仅在 do_work(( 完成后运行。

如果 get_url(( 需要很短的时间,而 do_work(( 需要很长时间,它将是一系列快速任务、慢任务和总时间:

t_total_per_worker = (t_get_url_average+t_do_work_average( X 150M

如果您有 n 个辅助角色

t_total = t_total_per_worker/n

t_total = (t_get_url_average+t_do_work_average( X 150M/n

现在,如果get_url((是时间关键而do_work((不是,那么,如果可以的话,你应该先运行所有150M get_url((,完成后运行所有150M do_work((,但这可能需要更改您的流程设计。

这就是我要做的。也许其他人有更好的想法!?

最新更新