Im从一个进程(a1(启动三个进程(a2、a3、a4(。这三个(子(进程需要将数据发送到另一个进程(b1(。我不能在a2、a3、a4的(子(进程中创建这个进程(b1((这是一个被调用三次的函数(,因为它将生成三个进程,而不是一个。因此,队列的处理似乎变得困难起来。所有的函数都在无休止地循环。我该如何解决这个问题?这里有一个简单的代码形式:
import multiprocessing
from multiprocessing import Queue, Process
def Process_B(): #q4: multiprocessing.Queue
while True:
if q4.qsize() > 0:
var1, var2, var3 = q4.get()
print(var1, var2, var3)
#here is where i cant get the data
def Process_A2_3_4(q123: multiprocessing.Queue):
supProcesses = SubProcesses123()
while True:
if q123.qsize() > 0:
var1, var2, var3 = q123.get()
supProcesses.doSomething(var1, var2, var3)
class SubProcesses123:
def __init__(self, name):
self.name = name
def doSomething(self, var1, var2, var3):
print('doing something')
#this needs to put something in a Queue
#same Queue for all three Processes(a2,a3,a4)
#Data will be received by Function: Process_B
class Process_A:
def __init__(self):
self.a2 = Queue()
self.a3 = Queue()
self.a4 = Queue()
self.startProcess234()
self.doSomething()
def startProcess234(self):
a2 = Process(target=Process_A2_3_4, args=(self.a2, "a2"))
a3 = Process(target=Process_A2_3_4, args=(self.a3, "a3"))
a4 = Process(target=Process_A2_3_4, args=(self.a4, "a4"))
a2.start()
a3.start()
a4.start()
def doSomething(self):
print('doing something')
if __name__ == '__main__':
retrieve = Process(target=Process_A)
retrieve.start()
因此,需要创建一个单独的队列,进程(a2、a3、a4(将使用该队列向Process_B发送数据。问题是怎么做?当我在main或Process_a中创建一个队列并将其提供给Process_A2_3_4时,它确实在队列中放置了一些东西(至少没有错误(,但我无法从其他进程中的队列中检索数据。任何帮助都将不胜感激。
这就是你想要做的事情。创建一个队列来馈送输入的dudes("A"进程(,并为这些dudes创建一个排队来将其输出馈送到合并器dude("B"进程(。
import multiprocessing
from multiprocessing import Queue, Process
import time
def Process_B(q4):
while True:
var1, var2, var3 = q4.get()
print("in B", var1, var2, var3)
def Process_A2_3_4(q123, q4, name):
print(name,"starting")
supProcesses = SubProcesses123(q4, name)
while True:
var1, var2, var3 = q123.get()
supProcesses.doSomething(var1, var2, var3)
class SubProcesses123:
def __init__(self, q, name):
self.name = name
self.q = q
def doSomething(self, var1, var2, var3):
print('in SP123', var1, var2, var3)
self.q.put( (var1,var2,var3) )
class Process_A:
def __init__(self):
self.q1 = Queue()
self.q2 = Queue()
self.startProcess234()
for _ in range(20):
self.q1.put( (1,2,3) )
time.sleep(1)
def startProcess234(self):
a2 = Process(target=Process_A2_3_4, args=(self.q1, self.q2, "a2"))
a3 = Process(target=Process_A2_3_4, args=(self.q1, self.q2, "a3"))
a4 = Process(target=Process_A2_3_4, args=(self.q1, self.q2, "a4"))
b = Process(target=Process_B, args=(self.q2,))
a2.start()
a3.start()
a4.start()
b.start()
def doSomething(self):
print('doing something')
if __name__ == '__main__':
retrieve = Process(target=Process_A)
retrieve.start()