如何优雅地组织这个多线程python代码



我正在开发一个Python服务,该服务从一个消息代理订阅实时流数据并发布到另一个代理,在某些情况下,我还需要从网络断开或系统恢复的其他数据源获取快照数据。虽然流数据来自一个线程,而一些服务事件发生在另一个线程中,但我决定创建一个数据处理线程来逐个弹出队列。我开始工作了,但后来我试图将快照获取逻辑保留在一个单独的线程中,这就是事情变得一团糟的地方。

我知道这是一个很长的问题,有很多具体的细微差别,但我尽量把这里的例子说清楚。

下面是第一次尝试的样子,效果很好:

import queue
import threading
def process_data(data_queue, data_store):
# data_store is my internal cache data structure. 
# so for simplicity and demonstration purpose, I assume the following:
# if its type is dict, it's snapshot data
# if its type is tuple, it's a key/value pair and that's an incremental update data
# if it is -1, we terminate the queue processing
# if it is -2, we need to retrieve a snapshot
while True:
x = data_queue.get()
if isinstance(x, dict):
data_store.on_snapshot(x)
elif isinstance(x, tuple):
k, v = x
data_store.on_update(k, v)
elif isinstance(x, int):
if x == -1:
data_queue.task_done()
break
elif x == -2:
get_snapshot() # this is potentially a long blocking call
else:
print('unknown int', x)
else: 
print('unknown data', x)
data_queue.task_done()

if __name__ == '__main__':
data_store = DataStore()
data_queue = queue.Queue()
# start other threads that write data to the queue
start_data_writer1(data_queue)
start_data_writer2(data_queue)
start_thread_for_some_event(data_queue) # may put -2 in the queue for snapshot

process_thread = threading.Thread(
target=process_data, 
args=(data_queue, data_store))
process_thread.start()
data_queue.put(-2) # signal a snapshot fetching    
do_something_else()
try:
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print('terminating...')
finally:
# to break out of the infinite loop in process_data()
data_queue.put(-1) 
process_thread.join()
data_queue.join()

这种方式很有效,但我并不特别喜欢在处理线程中调用get_snapshot()函数。我认为处理线程的想法是一直忙于从队列中弹出数据,除非没有什么可弹出的。在上面的实现中,在get_snapshot调用期间,可能会由于其他编写器线程而建立队列。

所以我尝试了其他的方法,我也想优雅地退出这个程序。这就是事情变得非常丑陋的地方。我创建了一个新的线程来偶尔获取快照,并使用条件对象进行线程通信。这是我在现有代码之上所做的:

snapshot_lock = threading.Lock()
cond = threading.Condition(snapshot_lock)
need_for_snapshot = False # used to trigger snapshots
keep_snapshot_thread = True # flag if the snapshot thread is done
# then I need to add this new function to run snapshot fetching
def fetch_snapshot(data_queue):
global need_for_snapshot
global keep_snapshot_thread

def _need_snapshot():
global need_for_snapshot
return need_for_snapshot 

while True:
with cond:
cond.wait_for(_need_snapshot)
if not keep_snapshot_thread:
break
data_queue.put(get_snapshot()) # the long blocking function
need_for_snapshot = False

# in process_data() function, everything stays the same except for the `if` statement for handling `x == 2`
def process_data(data_queue, data_store):
global need_for_snapshot
while True:
x = data_queue.get()
# omitting some old code
elif isinstance(x, int):
if x == -1:
data_queue.task_done()
break
elif x == -2: 
**with cond:
need_for_snapshot = True
cond.notify()**
# more code omitted    
if __name__ == '__main__':
# same code as before except for the finally part
try:
# start other threads...omitting some code
# when a snapshot is needed in these threads
# do the following
# with cond:
#     need_for_snapshot = True
#     cond.notify()
# start snapshot worker thread 
snapshot_thread = threading.Thread(
target=fetch_snapshot, args=(data_queue,))
process_thread = threading.Thread(
target=process_data,
args=(data_queue, data_store))
snapshot_thread.start()
process_thread.start()
data_queue.put(-2) # signal fetching a snapshot
# omitting more code here...
finally: 
keep_snapshot_thread = False
# we don't technically need to trigger another snapshot now
# but the code below is to unblock the cond.wait_for() part
# since keep_snapshot_thread flag is just flipped, we can use 
# it to break out of the infinite loop in fetch_snapshot thread.
# This is the part that I feel hacky...
with cond: 
need_for_snapshot = True
cond.notify()
snapshot_t.join()
data_queue.put(-1) # signal the termination of process_thread
process_t.join()
data_queue.join()

我想我做到了,尤其是当我点击ctrl-c时,程序可以优雅地退出,但它是如此丑陋和棘手,以至于我不得不快速使用它才能使它正确工作。

有什么办法可以让我写得更优雅吗?有没有我们通常用来解决这类问题的某种模式?非常感谢你的帮助。

处理多个生产者和多个消费者的标准技术是使用事件is_done和可连接队列work

工作队列只做:

while not event.is_set():
try:
job = work.get(timeout=5)
except Empty:
continue
handle the job
work.task_done()

您的主要工作人员执行以下操作:

start the jobs that produce work
wait for them to be done
work.join() # wait for the queue to be empty
event.set() # tell the workers they can exit
perform any cleanup necessary

请注意,目标是尽可能地使工人和生产者脱钩。试图创建将它们联系在一起的复杂逻辑几乎肯定会产生种族案例。

另一种选择是创建一个哨兵对象;结束";表示一切都完成了。一旦所有的生产者都完成了,主线程就会将相当于工作者数量的sentinel对象推送到工作队列中,并调用work.join((。每个工作者线程都会在循环中调用work.get(),并在看到sentinel时退出。记住也要在哨兵上呼叫work.task_done()

再说一遍。您需要简单的逻辑并使用多线程提供的工具。