Redis 队列中的并发性



我正在使用一个托管在heroku上的django应用程序,其中包含redistogo addon:nano pack。我正在使用 rq 在后台执行任务 - 任务由在线用户启动。我有增加连接数量的限制,恐怕资源有限。

我目前有一个工人运行超过"n"个队列。每个队列使用连接池中的连接实例来处理"n"种不同类型的任务。例如,假设 4 个用户启动相同类型的任务,我希望让我的主要工作线程动态创建子进程来处理它。有没有办法实现所需的多处理和并发?

我尝试使用multiprocessing模块,最初没有引入Lock();但这暴露并覆盖了用户传递给启动函数的数据,以及以前的请求数据。应用锁后,它通过返回server error - 500来限制第二个用户发起请求

github链接#1:看起来团队正在做PR;虽然还没有发布!

github链接#2:这篇文章有助于解释在运行时创建更多worker。 但是,此解决方案也会覆盖数据。新请求再次使用以前的请求数据进行处理。

如果您需要查看一些代码,请告诉我。我将尝试发布一个最小的可重现片段。

有什么想法/建议/指导方针吗?

你有机会尝试AutoWorker吗?

自动生成 RQ 工作线程。

from autoworker import AutoWorker
aw = AutoWorker(queue='high', max_procs=6)
aw.work()

它利用redis模块StrictRedismultiprocessing,并从rq导入

from rq.contrib.legacy import cleanup_ghosts
from rq.queue import Queue
from rq.worker import Worker, WorkerStatus

在深入了解后,我意识到Worker类已经在实现多处理。

work函数在内部调用execute_job(job, queue)而又如模块中引用的那样

生成一匹工作马来执行实际工作并传递给它一个作业。

工作线程将等待工作马并确保它在给定的超时范围内执行,

或者将用 SIGALRM 结束工作马。

execute_job()函数隐式调用fork_work_horse(job, queue),该函数会生成一匹工作马来执行实际工作,并根据以下逻辑向其传递作业:


def fork_work_horse(self, job, queue):
child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0:
self.main_work_horse(job, queue)
else:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))

main_work_horseperform_job(job, queue)进行内部调用,进行一些其他调用以实际执行作业。

rq 官方文档页面上提到的有关工作线程生命周期的所有步骤都在这些调用中得到了处理。

这不是我所期望的多处理,但我想他们有做事的方式。但是我的原始帖子仍然没有回答这个问题,我也仍然不确定并发性。

那里的文档仍然需要处理,因为它几乎无法涵盖这个库的真正本质!

相关内容

  • 没有找到相关文章

最新更新