进程间共享队列



我对python中的多处理非常陌生,并试图实现一些应该是相当常见的事情。但是我在网上找不到一个简单的方法。

我想把数据放在队列中,然后让这个队列对不同的消费者函数可用。当然,当从队列中获取元素时,所有消费者函数都应该获得相同的元素。下面的例子应该清楚地说明了我想要实现的目标:

from multiprocessing import Process, Queue
def producer(q):
for i in range(10):

q.put(i)

q.put(None)
def consumer1(q):
while True:
data = q.get()

if data is None:   
break

print(data)
def consumer2(q):
while True:
data = q.get()

if data is None:   
break

print(data)
def main():
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer1, args=(q,))
p3 = Process(target=consumer2, args=(q,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()

if __name__ == '__main__':
main()

由于脚本没有终止,我只得到一个函数的打印输出,我想这不是这样做的。我认为共享队列意味着需要考虑一些事情?当只使用一个消费者函数时,它工作得很好。谢谢你的帮助!

如果您存储的值可以由ctypes模块中定义的基本数据类型之一表示,则可以使用以下方法。这里我们实现了一个"队列"。可以保存int值或None:

from multiprocessing import Process, Condition
import ctypes
from multiprocessing.sharedctypes import RawArray, RawValue
from threading import local
import time
my_local = local()
my_local.current = 0
class StructuredInt(ctypes.Structure):
"""
This class is necessary because we want to be able to store in the RawArray
either an int or None, which requires using ctypes.c_void_p as the array type.
But, infortunately, ctypes.c_void_p(0) is interpreted as None.
So we need a way to represent 0. Field value 'value' is the
actual int value being stored and we use an arbitrarty 'ptr'
field value that will not be interpreted as None.
To store a None value, we set 'ptr' to ctypes.c_void_p(None) and field
'value' is irrelevant.
To store an integer. we set 'ptr' to ctypes.c_void_p(1) and field
'value' has the actual value.
"""
_fields_ = [('ptr', ctypes.c_void_p), ('value', ctypes.c_int)]
class MultiIntQueue:
"""
An integer queue that can be processed by multiple threads where each thread
can retrieve all the values added to the queue.
:param maxsize: The maximum queue capacity (defaults to 20 if specified as None)
:type maxsize: int
"""
def __init__(self, maxsize=None):
if maxsize is None:
maxsize = 20
self.maxsize = maxsize
self.q = RawArray(StructuredInt, maxsize)
self.condition = Condition()
self.size = RawValue(ctypes.c_int, 0)
def get(self):
with self.condition:
while my_local.current >= self.size.value:
self.condition.wait()
i = self.q[my_local.current]
my_local.current += 1
return None if i.ptr is None else i.value
def put(self, i):
assert 0 <= self.size.value < self.maxsize
with self.condition:
self.q[self.size.value] = (ctypes.c_void_p(None), 0) if i is None else (ctypes.c_void_p(1), i)
self.size.value += 1
self.condition.notify_all()

def producer(q):
for i in range(10):
q.put(i)
time.sleep(.3) # simulate processing
q.put(None)
def consumer1(q):
while True:
data = q.get()
if data is None:
break
time.sleep(.1) # simulate processing
print('Consumer 1:', data)
def consumer2(q):
while True:
data = q.get()
if data is None:
break
time.sleep(.1) # simulate processing
print('Consumer 2:', data)
def main():
q = MultiIntQueue()
p1 = Process(target=producer,  args=(q,))
p2 = Process(target=consumer1, args=(q,))
p3 = Process(target=consumer2, args=(q,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()

if __name__ == '__main__':
main()

打印:

Consumer 1: 0
Consumer 2: 0
Consumer 2: 1
Consumer 1: 1
Consumer 2: 2
Consumer 1: 2
Consumer 2: 3
Consumer 1: 3
Consumer 2: 4
Consumer 1: 4
Consumer 1: 5
Consumer 2: 5
Consumer 1: 6
Consumer 2: 6
Consumer 1: 7
Consumer 2: 7
Consumer 2: 8
Consumer 1: 8
Consumer 1: 9
Consumer 2: 9

你的问题体现了误解

"所有消费者函数应该得到相同的元素">

这不是排队的工作方式。队列是自动管理的(内部有很多),例如如果放入一个项目,则只能取出一个项目。该项目不会复制给所有消费者。看起来您实际上需要两个单独的队列来保证每个消费者获得每个输入,而不会与其他消费者竞争:

from multiprocessing import Process, Queue
def producer(q1, q2):
for i in range(10):

q1.put(i)
q2.put(i)

q1.put(None)
q2.put(None)
def consumer1(q):
while True:
data = q.get()

if data is None:   
break

print(data)
def consumer2(q):
while True:
data = q.get()

if data is None:   
break

print(data)
def main():
q1 = Queue()
q2 = Queue()
p1 = Process(target=producer,  args=(q1, q2))
p2 = Process(target=consumer1, args=(q1,))
p3 = Process(target=consumer2, args=(q2,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()

if __name__ == '__main__':
main()

最新更新