我希望有两个并行运行的进程。一个从另一个获取输入,处理数据并将处理后的数据作为另一个的输出发送回去。另一个进程执行相同的操作。显然,需要有一个起点和一个终点。
如何在进程运行时在进程之间进行通信?我只设法依次运行两个进程。
我试图用multiprocessing
来解决它:
from multiprocessing import Process, Queue, Array
sentinel = -1
def process1(q, arr):
# Receives data, modifies it and sends it back
while True:
data = q.get() # Receive data
if data is sentinel:
break
data *= 2 # Data modification
arr.append(data) # Data logging
q.put(data) # Send data
q.put(sentinel) # Send end signal
def process2(q, arr):
# Receives data, increments it and sends data back
if q.empty():
data = 1
else:
while True:
data = q.get() # Receive data
if data == sentinel:
break
data += 1
q.put(data) # Send data
arr.append(data) # Data logging
q.put(sentinel) # Send end signal
if __name__ == "__main__":
q = Queue()
logbook = Array('i', 0)
counter = 0
while counter < 10:
process_1 = Process(target=process1, args=(q, logbook))
process_2 = Process(target=process2, args=(q, logbook))
process_1.start()
process_2.start()
q.close()
q.join_thread()
process_1.join()
process_2.join()
counter += 1
print(logbook)
我试图理解您的需求,但我并不完全清楚,因此我提出了这个代码的生产者-消费者版本,其中两个进程进行通信以达到一定数量的迭代的最终结果。
首先,您需要两个队列,以避免将内容放入队列的同一进程在另一个进程之前读取它。 其次,您需要一种机制来就计算结束达成一致,在本例中为 None 消息。
我建议的解决方案总结在以下代码中:
from multiprocessing import Process, Queue, Array
def process1(in_queue, out_queue):
# Receives data, modifies it and sends it back
while True:
data = in_queue.get() # Receive data
if data is None:
out_queue.put(None) # send feedback about END message
out_queue.close()
out_queue.join_thread()
break
data *= 2 # Data modification
out_queue.put(data) # Send data
def process2(in_queue, out_queue, how_many):
data = 0
# Receives data, increments it and sends data back
while how_many > 0:
data += 1 # Data modification
out_queue.put(data) # Send data
how_many -= 1
data = in_queue.get() # Receive data
if data is None:
break
# send END message
out_queue.put(None)
out_queue.close()
out_queue.join_thread()
assert in_queue.get() is None
if __name__ == "__main__":
q1 = Queue()
q2 = Queue()
process_1 = Process(target=process1, args=(q1, q2))
process_2 = Process(target=process2, args=(q2, q1, 10))
process_1.start()
process_2.start()
process_2.join()
process_1.join()
q1.close()
q2.close()
q1.join_thread()
q2.join_thread()
您可以使用套接字来完成此任务,甚至可以使用微服务方法(例如通过 rest api 调用)。
@Roberto Trani
从您的解决方案开始,我什至能够使用第三个队列记录两个进程之间发生的通信。
谢谢,我真的很困难,不知道如何解决这个问题。
from multiprocessing import Process, Queue
def process1(in_queue, out_queue, log_queue):
# Receives data, modifies it and sends it back
while True:
data = in_queue.get() # Receive data
if data is None:
log_queue.put(None) # log END
out_queue.put(None) # send feedback about END message
break
data *= 2 # Data modification
print("p1: {}".format(data))
log_queue.put(data) # Log data
out_queue.put(data) # Send data
def process2(in_queue, out_queue, how_many, log_queue):
data = 0
# Receives data, increments it and sends data back
while how_many > 0:
data += 1 # Data modification
print("p2: {}".format(data))
log_queue.put(data) # Log Data
out_queue.put(data) # Send data
how_many -= 1
data = in_queue.get() # Receive data
if data is None:
break
# send END message
log_queue.put(None) # log END
out_queue.put(None)
out_queue.close()
out_queue.join_thread()
assert in_queue.get() is None
if __name__ == "__main__":
q1 = Queue()
q2 = Queue()
q3 = Queue()
logbook = []
process_1 = Process(target=process1, args=(q1, q2, q3))
process_2 = Process(target=process2, args=(q2, q1, 10, q3))
process_1.start()
process_2.start()
process_2.join()
process_1.join()
q1.close()
q2.close()
q1.join_thread()
q2.join_thread()
while True:
data = q3.get()
logbook.append(data)
if data is None:
break
q3.close()
q3.join_thread()
print(logbook)
你能进一步解释一下你所说的微服务方法是什么意思吗?我听说过REST,现在我正试图弄清楚如何在Python中实现这种范式。–
例如,像网络服务一样。您可以提供对模块内服务(函数、方法)的访问。该模块可以通过 REST API 访问,例如使用自上而下的方法作为 OpenApi 规范 (https://en.wikipedia.org/wiki/OpenAPI_Specification )。
我目前正在使用这种方法:设计一个高级接口(模块,每个模块的功能,层次结构和模块的互连);在openapi编辑器(在线:https://editor.swagger.io)的yaml/json文件中使用CRUD(https://en.wikipedia.org/wiki/Create,_read,_update_and_delete)写下该设计以满足REST端点;使用编辑器功能生成python代码(flask);编辑样板代码以实际实现后端功能;运行用于提供对 API 方法的访问的服务器。 您甚至可以将模块转换为 docker 映像以实现可扩展性:我使用此基本映像:https://github.com/tiangolo/uwsgi-nginx-flask-docker/