Python多处理:将任务添加到队列中,但在给定时间内防止它们被拾取



我使用具有多个工作者(multiprocessing.Process的子类(和队列(multiprocessing.JoinableQueue(的多处理来实现复杂的数据操作工作流。

其中一个工作者(JobSender(正在向远程系统(web服务(提交作业,该系统会立即返回标识符。这些工作可能需要很长时间才能完成。因此,我有另一个工作人员(StatusPoller(负责轮询该远程系统以了解作业的状态。为此,JobSender将标识符添加到StatusPoller用作输入的队列中。如果作业未完成,StatusPoller会将标识符放回同一队列。如果作业完成,则StatusPoller检索结果信息,然后将其添加到列表中(multiprocessing.Manager.list()(。

我的问题是:我不想让远程系统不断地请求状态,这会在我的设置中发生。我想在某个地方引入一个延迟,以确保任何给定标识符的状态轮询仅每20秒发生一次。

目前,我通过在StatusPoller将标识符放回队列之前使用time.sleep(20)来实现这一点。但这意味着StatusPoller现在空闲了20秒,无法从队列中获取另一个轮询任务。我会有多个StatusPollers,但我不能每个工作都有一个(可能有数百个(。

class StatusPoller(multiprocessing.Process):
def __init__(self, polling_queue, results_queue, errors_queue):
multiprocessing.Process.__init__(self)
self.polling_queue = polling_queue
self.results_queue = results_queue
def run(self):
while True:
# Pick a task from the queue
next_id = self.polling_queue.get()
# Poison pill => shutdown
if next_id == 'END':
self.polling_queue.task_done()
break
# Process the task
response = remote_system.get_status(next_id)
if response == "IN_PROGRESS":
time.sleep(20)
self.polling_queue.put(next_id)
else:
self.results_queue.put(response)
self.polling_queue.task_done()

知道如何实现这样的工作流程吗?

当您考虑到multiprocessing.Processmultithreading.Threading类可以用target关键字实例化时,我认为实际对这些类进行子类化是一种反模式,因为这样会失去一些灵活性和重用性。事实上,在您的情况下,我认为StatusPoller只是在等待一个队列和来自网络的回复,那么多线程就足够了,尤其是如果,正如您所说,您有";数以百计"在您当前的代码中,我也看不到对可加入队列的需求。

因此,我建议对常规queue.Queue实例和sched模块中的sched.scheduler类实例使用多线程,因为代码对线程安全,所以可以在所有StatusPoller实例之间共享。以下是总体思路:

from threading import Thread
from queue import Queue
import time
# Start of modified sched.scheduler code:
#########################################################
# Heavily modified from sched.scheduler
import time
import heapq
from collections import namedtuple
import threading
from time import monotonic as _time

class Event(namedtuple('Event', 'time, priority, action, argument, kwargs')):
__slots__ = []
def __eq__(s, o): return (s.time, s.priority) == (o.time, o.priority)
def __lt__(s, o): return (s.time, s.priority) <  (o.time, o.priority)
def __le__(s, o): return (s.time, s.priority) <= (o.time, o.priority)
def __gt__(s, o): return (s.time, s.priority) >  (o.time, o.priority)
_sentinel = object()
class Scheduler():
"""
Code modified from sched.scheduler
"""
delayfunc = time.sleep
def __init__(self, timefunc=_time):
"""Initialize a new instance, passing the time functions"""
self._queue = []
self.timefunc = timefunc
self.got_event = threading.Condition(threading.RLock())
self.thread_started = False
def enterabs(self, time, priority, action, argument=(), kwargs=_sentinel):
"""Enter a new event in the queue at an absolute time.
Returns an ID for the event which can be used to remove it,
if necessary.
"""
if kwargs is _sentinel:
kwargs = {}
event = Event(time, priority, action, argument, kwargs)
with self.got_event:
if not self.thread_started:
self.thread_started = True
threading.Thread(target=self.run, daemon=True).start()
heapq.heappush(self._queue, event)
# Show new Event has been entered:
self.got_event.notify()
return event # The ID
def cancel(self, event):
"""Remove an event from the queue.
This must be presented the ID as returned by enter().
If the event is not in the queue, this raises ValueError.
"""
with self.got_event:
self._queue.remove(event)
heapq.heapify(self._queue)
def enter(self, delay, priority, action, argument=(), kwargs=_sentinel):
"""A variant that specifies the time as a relative time.
This is actually the more commonly used interface.
"""
time = self.timefunc() + delay
return self.enterabs(time, priority, action, argument, kwargs)
def empty(self):
"""Check whether the queue is empty."""
with self.got_event:
return not self._queue
def run(self):
"""Execute events until the queue is empty."""
# localize variable access to minimize overhead
# and to improve thread safety
got_event = self.got_event
q = self._queue
timefunc = self.timefunc
delayfunc = self.delayfunc
pop = heapq.heappop
while True:
try:
while True:
with got_event:
got_event.wait_for(lambda: len(q) != 0)
time, priority, action, argument, kwargs = q[0]
now = timefunc()
if time > now:
# Wait for either the time to elapse or a new
# event to be added:
got_event.wait(timeout=(time - now))
continue
pop(q)
action(*argument, **kwargs)
delayfunc(0)   # Let other threads run
except:
pass
@property
def queue(self):
"""An ordered list of upcoming events.
Events are named tuples with fields for:
time, priority, action, arguments, kwargs
"""
# Use heapq to sort the queue rather than using 'sorted(self._queue)'.
# With heapq, two events scheduled at the same time will show in
# the actual order they would be retrieved.
with self.got_event:
events = self._queue[:]
return list(map(heapq.heappop, [events]*len(events)))
###########################################################
def re_queue(polling_queue, id):
polling_queue.put(id)

class StatusPoller:
scheduler = Scheduler()

def __init__(self, polling_queue, results_queue, errors_queue):
self.polling_queue = polling_queue
self.results_queue = results_queue
def run(self):
while True:
# Pick a task from the queue
next_id = self.polling_queue.get()
# Poison pill => shutdown
if next_id == 'END':
break
# Process the task
response = remote_system.get_status(next_id)
if response == "IN_PROGRESS":
self.scheduler.enter(20, 1, re_queue, argument=(self.polling_queue, next_id))
else:
self.results_queue.put(response)

解释

首先,为什么我说我认为没有理由使用JoinableQueuerun方法被编程为在发现"END"输入消息时返回。但由于这种方法在发现";IN_PROGRES";来自远程系统的响应将消息重新排队回到pollinq_queue上,存在这样的可能性:当接收到END并且run终止时,这些重新排队的消息中有一个或多个保留在队列上。那么,另一个进程或线程如何在不挂起的情况下依赖于调用polling_queue.join()呢?它不能。

相反,如果您有N个进程或线程(我们还没有决定是哪个(对单个队列实例执行get请求,那么只需在队列上放置N个"END"关闭消息就足够了。这将导致N个进程终止。如果主进程希望阻止这些进程/线程的实际终止,它现在不加入队列,而是加入N个进程或线程

我使用JoinableQueue的方式(我认为这不适合您的用例(是,如果进程/线程处于一个永不终止的无限循环中,也就是说,不退出";过早地";因此永远不会将项目留在队列中。您将使这些进程/线程成为守护进程进程,以便它们最终在主进程最终终止时结束。因此,您不能使用"END"消息强制终止。所以我只是不明白JoinableQueue在这里是如何工作的,但如果我误解了什么,你可以向我指出。

是的,StatusPoller可能是Process实例的目标(甚至是Process的子类,正如您最初拥有的那样,尽管除了目前的编码方式之外,我认为这样做没有任何好处(。但在我看来,它将把大部分时间花在等待排队或获得网络响应上。在这两种情况下,它都将释放全局解释器锁,并且多线程应该具有很高的性能。如果我们确实在讨论创建数百个这些任务的实例,线程占用的资源也会少得多,尤其是在Windows下运行的情况下。您也将无法在所有StatusPoller实例中共享在自己线程中运行的scheduler。现在每个进程中将运行一个调度器,因为每个StatusPoller都在其自己的进程中运行。