ZeroMQ:负载平衡多个worker和一个master



假设我有一个主进程,它划分要并行处理的数据。假设有1000个数据块和100个节点来运行计算。

有办法做REQ/REP保持所有的工人忙吗?我尝试在指南中使用负载均衡器模式,但对于单个客户端,sock.recv()将阻塞,直到它从工作器收到响应。

下面是代码,稍微修改了负载平衡器的zmq指南。它启动一个客户机、10个worker和中间的负载平衡器/代理。我怎样才能让所有的工人同时工作?

from __future__ import print_function
from multiprocessing import Process
import zmq
import time
import uuid
import random
def client_task():
    """Basic request-reply client using REQ socket."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = str(uuid.uuid4())
    socket.connect("ipc://frontend.ipc")
    # Send request, get reply
    for i in range(100):
        print("SENDING: ", i)
        socket.send('WORK')
        msg = socket.recv()
        print(msg)
def worker_task():
    """Worker task, using a REQ socket to do load-balancing."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = str(uuid.uuid4())
    socket.connect("ipc://backend.ipc")
    # Tell broker we're ready for work
    socket.send(b"READY")
    while True:
        address, empty, request = socket.recv_multipart()
        time.sleep(random.randint(1, 4))
        socket.send_multipart([address, b"", b"OK : " + str(socket.identity)])

def broker():
    context = zmq.Context()
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("ipc://frontend.ipc")
    backend = context.socket(zmq.ROUTER)
    backend.bind("ipc://backend.ipc")
    # Initialize main loop state
    workers = []
    poller = zmq.Poller()
    # Only poll for requests from backend until workers are available
    poller.register(backend, zmq.POLLIN)
    while True:
        sockets = dict(poller.poll())
        if backend in sockets:
            # Handle worker activity on the backend
            request = backend.recv_multipart()
            worker, empty, client = request[:3]
            if not workers:
                # Poll for clients now that a worker is available
                poller.register(frontend, zmq.POLLIN)
            workers.append(worker)
            if client != b"READY" and len(request) > 3:
                # If client reply, send rest back to frontend
                empty, reply = request[3:]
                frontend.send_multipart([client, b"", reply])
        if frontend in sockets:
            # Get next client request, route to last-used worker
            client, empty, request = frontend.recv_multipart()
            worker = workers.pop(0)
            backend.send_multipart([worker, b"", client, b"", request])
            if not workers:
                # Don't poll clients if no workers are available
                poller.unregister(frontend)
    # Clean up
    backend.close()
    frontend.close()
    context.term()
def main():
    NUM_CLIENTS = 1
    NUM_WORKERS = 10
    # Start background tasks
    def start(task, *args):
        process = Process(target=task, args=args)
        process.start()
    start(broker)
    for i in range(NUM_CLIENTS):
        start(client_task)
    for i in range(NUM_WORKERS):
        start(worker_task)

    # Process(target=broker).start()


if __name__ == "__main__":
    main()

我想有不同的方法来做到这一点:

例如,您可以使用 threading 模块从单个客户机启动所有请求,如下所示:
result_list = []  # Add the result to a list for the example 
rlock = threading.RLock()
def client_thread(client_url, request, i):
    context = zmq.Context.instance()
    socket = context.socket(zmq.REQ)
    socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
    socket.connect(client_url)
    socket.send(request.encode())
    reply = socket.recv()
    with rlock:
        result_list.append((i, reply))
    return
def client_task():
    # tasks = list with all your tasks
    url_client = "ipc://frontend.ipc"
    threads = []
    for i in range(len(tasks)):
        thread = threading.Thread(target=client_thread,
                                    args=(url_client, tasks[i], i,))
        thread.start()
        threads.append(thread)

-您可以利用像 asyncio 这样的事件库(有一个子模块zmq。Asyncio和另一个库(aiozmq,后者提供了更高的抽象级别)。在这种情况下,您将按顺序将请求发送给worker,但不会阻塞每个响应(因此不会使主循环繁忙),并在它们返回主循环时获取结果。可以像这样:

import asyncio
import zmq.asyncio
async def client_async(request, context, i, client_url):
    """Basic client sending a request (REQ) to a ROUTER (the broker)"""
    socket = context.socket(zmq.REQ)
    socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
    socket.connect(client_url)
    await socket.send(request.encode())
    reply = await socket.recv()
    socket.close()
    return reply

async def run(loop):
    # tasks = list full of tasks
    url_client = "ipc://frontend.ipc"
    asyncio_tasks = []
    ctx = zmq.asyncio.Context()
    for i in range(len(tasks)):
        task = asyncio.ensure_future(client_async(tasks[i], ctx, i, url_client))
        asyncio_tasks.append(task)
    responses = await asyncio.gather(*asyncio_tasks)
    return responses
zmq.asyncio.install()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(run(loop))

我没有测试这两个片段,但它们都来自代码(修改以适应问题),我使用zmq在类似的配置比你的问题。

相关内容

最新更新