一个脚本(datamanger.py)
from multiprocessing import Manager
q = Manager().Queue()
其他两个脚本是这样的
from datamanager import q
import time
while True:
time.sleep(1)
q.put(1)
from datamanager import q
while True:
if not q.empty():
data = q.get()
print(data)
是否有可能只使用队列而不是像kafka这样的消息队列来实现这个功能?
下面是Ahmed的答案的另一个选择,它使用了一个更简单的单例。
server.py:
from multiprocessing.managers import BaseManager
from multiprocessing import Queue
address = ('127.0.0.1', 50000) # you can change this
authkey = b"abc" # you should change this
class SharedQueue:
def __init__(self):
self._queue = Queue()
self._queue.put("Something really important!")
def __call__(self):
return self._queue
if __name__ == "__main__":
# Register our queue
shared_queue = SharedQueue()
BaseManager.register("get_queue", shared_queue)
# Start server
manager = BaseManager(address=address, authkey=authkey)
srv = manager.get_server()
srv.serve_forever()
client.py
from server import address, authkey
from multiprocessing.managers import BaseManager
if __name__ == "__main__":
BaseManager.register("get_queue")
manager = BaseManager(authkey=authkey, address=address)
manager.connect()
queue = manager.get_queue()
print(queue.get())
为了使队列处于活动状态,而不绑定到任何一个进程,您需要生成一个管理它的服务器,该服务器应该有一个单例队列,并且每个与它联系的人都将获得该队列的代理,服务器代码如下:
# queue_server.py
from multiprocessing.managers import SyncManager
from multiprocessing.managers import BaseProxy
import multiprocessing
address = ('127.0.0.1', 50000) # you can change this
authkey = b"abc" # you should change this
class SingletonQueue:
instance = None
def __new__(cls, *args, **kwargs):
if SingletonQueue.instance is None:
SingletonQueue.instance = object.__new__(SingletonQueue)
return SingletonQueue.instance
else:
return SingletonQueue.instance
def get_queue(self):
if not hasattr(self, "queue"):
manager = SyncManager(address=address, authkey=authkey)
manager.connect()
self.queue = manager.Queue()
return self.queue
class CustomQueueProxy(BaseProxy):
_exposed_ = ['get_queue']
def get_queue(self):
queue = self._callmethod('get_queue')
return queue
def connect_manager():
multiprocessing.current_process().authkey = authkey
manager = SyncManager(address=address, authkey=authkey)
manager.register("SingletonQueue", SingletonQueue, CustomQueueProxy)
manager.connect()
return manager
def start_server():
manager = SyncManager(address=address, authkey=authkey)
manager.register("SingletonQueue", SingletonQueue, CustomQueueProxy)
server = manager.get_server()
print(f"running on ip = {server.address[0]}, and port {server.address[1]}")
multiprocessing.current_process().authkey = authkey
server.serve_forever()
if __name__ == "__main__":
start_server()
您需要运行服务器,在运行服务器后您可以使用客户端连接到它,客户端代码将看起来像这样:
import multiprocessing
import queue_server # the server python file
manager = queue_server.connect_manager()
queue: multiprocessing.Queue = manager.SingletonQueue().get_queue()
queue.put(1)
print(queue.get())
请注意,这将python进程的身份验证密钥设置为某个值,因此您不能使用它来使用不同的身份验证密钥进行多个连接,您必须有一个固定的身份验证密钥。
编辑:如果将来有人读到这篇文章,我可能会选择Charchit Agarwal的答案,或者两种答案的混合。这取决于你是否想要允许跨网络/docker边界的连接,我的答案是允许的。