我有以下问题:我有一个dict,它有几百个键(仍然大约150 MB(,每个键都有一个复杂的值,包含dict、列表和单个值。我有3个传入信息流,分别为1s、0.1s和实时定时,具体取决于数据类型。为了加速数据处理,我想使用多处理为不同的源创建3个进程,最好每个进程都有自己的池来进一步加速。
问题是如何";chop";将一般dict分成可更新的部分。在我看来,使用池或进程时,我必须在初始化进程/池的一开始就决定参数列表。我的任务需要这样的东西:我收到一条消息,";A";密钥需要更新。我指派一个工作者来更新它,传递包含新信息和复杂对象"的消息;A";(或至少"A"的相关值(。我绝对不想把整个dict传给每个员工,因为它占用了很多内存。
在这个示例代码中,我只想在处理example_data_A的第一个元素时传递general_dict['A']['A'],对第三个元素传递general_dict'B']['A'A'],依此类推。我该如何通过辩论?
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}},
example_data_a = ['A', [2,1,2],
'A', [2,3,2],
'B', [3,0,5],
'C', [6,1,8]]
example_data_b = ['A', 'test11',
'B', 'test21',
'B', 'test22',
'C', 'test31']
def update_a(x):
...
def update_b(y):
...
if __name__ == "__main__":
p1 = multiprocessing.Process(target = update_a)
p2 = multiprocessing.Process(target = update_b)
p1.start()
p2.start()
p1.join()
p2.join()
我明白你的想法。但问题是,所有可能的密钥都可以通过这三个流,所以这听起来不是最可行的方法。在我看来,你应该有一个处理输入流的进程。此外,应该没有必要拆分字典。相反,您有三个过程来处理您所设想的三分之一的密钥。每个进程从一开始就启动,并将它们自己的multiprocessing.Queue
实例作为输入队列传递给它们,并且它们都将传递一个公共结果队列,用于传递返回值。由主进程启动的线程不断地进入结果队列,并用返回的值更新字典。
这是总体思路:
from multiprocessing import Process, Queue
from threading import Thread
def update_a(input_queue, result_queue):
while True:
# Wait for next request:
x = input_queue.get()
if x is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def update_b(input_queue, result_queue):
while True:
# Wait for next request:
y = input_queue.get()
if y is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def update_c(input_queue, result_queue):
while True:
# Wait for next request:
z = input_queue.get()
if x is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def process_results():
sentinels_seen = 0
# Have all 3 processes finished?
while sentinels_seen < 3:
# Get next result
result = result_queue.get()
if result is None:
# Sentinel
sentinels_seen += 1
else:
# Update general_dict with result:
...
def process_input_stream():
while True:
# When we have decided that we are through processing input
# break out of the loop:
if through_processing:
break
# Get input from one of 3 sources and depending on key
# put the "argument" to either a_q, b_q or c_q to be handled respectively
# by either update_a, update_b or update_c.
# The result will be put to result queue which will be processed by our
# process_results thread.
...
# Add a sentinel to each of the input queues:
a_q.put(None)
b_q.put(None)
c_q.put(None)
if __name__ == "__main__":
# Building the general_dict should be protected by if __name__ == "__main__":
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}}
a_q, b_q, c_q = Queue(), Queue(), Queue()
result_queue = Queue()
p1 = Process(target=update_a, args=(a_q, result_queue))
p2 = Process(target=update_b, args=(b_q, result_queue))
p3 = Process(target=update_c, args=(c_q, result_queue))
t = Thread(target=process_results)
p1.start()
p2.start()
p3.start()
process_input_stream()
p1.join()
p2.join()
p3.join()
t.join()
注意:
如果您发现process_results
线程和process_input_stream
循环之间存在太多争用,因为GIL阻止后者跟上输入流,那么不要启动并加入process_results
线程。相反,只需像以前一样启动并连接这三个进程,然后通过主进程将process_results
作为函数调用。当然,您将以这种方式释放任何并发:
if __name__ == "__main__":
# Building the general_dict should be protected by if __name__ == "__main__":
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}}
a_q, b_q, c_q = Queue(), Queue(), Queue()
result_queue = Queue()
p1 = Process(target=update_a, args=(a_q, result_queue))
p2 = Process(target=update_b, args=(b_q, result_queue))
p3 = Process(target=update_c, args=(c_q, result_queue))
p1.start()
p2.start()
p3.start()
process_input_stream()
p1.join()
p2.join()
p3.join()
process_results()