如何跨多个服务器/工作程序管理WebSocket



aiohttp内置了对websocket的支持。它非常简单,效果很好。

文档中示例的简化版本是:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    # Async iterate the messages the client sends
    async for message in ws:
        ws.send_str('You sent: %s' % (message.data,))
    print('websocket connection closed')

在该示例中,ws是对与客户端的websocket连接的引用。我可以很容易地将这些引用放在request.app中,就像@Crandel在这里所做的那样(即全局状态(,但不能放在生产应用程序中,因为每个应用程序服务器(甚至每个工作程序(都有自己的app实例。

这方面有公认的模式吗?还有别的办法吗?

注意:我指的不是会话。我指的是关系。当服务器B等中的应用程序代码中发生事件时,我想向连接到服务器a的客户端发送一条消息。

如果我理解正确,您希望有多个websocket服务器,每个服务器都连接了多个客户端,但您希望能够与所有连接的客户端进行潜在的通信。

这里有一个例子,它创建了三个琐碎的服务器——一个大写回显、一个随机报价和一天中的时间——然后向所有连接的客户端发送一条广播消息。也许这里面有一些有用的想法

粘贴框:https://pastebin.com/xDSACmdV

#!/usr/bin/env python3
"""
Illustrates how to have multiple websocket servers running and send
messages to all their various clients at once.
In response to stackoverflow question:
https://stackoverflow.com/questions/35820782/how-to-manage-websockets-across-multiple-servers-workers
Pastebin: https://pastebin.com/xDSACmdV
"""
import asyncio
import datetime
import random
import time
import webbrowser
import aiohttp
from aiohttp import web
__author__ = "Robert Harder"
__email__ = "rob@iharder.net"
__license__ = "Public Domain"

def main():
    # Create servers
    cap_srv = CapitalizeEchoServer(port=9990)
    rnd_srv = RandomQuoteServer(port=9991)
    tim_srv = TimeOfDayServer(port=9992)
    # Queue their start operation
    loop = asyncio.get_event_loop()
    loop.create_task(cap_srv.start())
    loop.create_task(rnd_srv.start())
    loop.create_task(tim_srv.start())
    # Open web pages to test them
    webtests = [9990, 9991, 9991, 9992, 9992]
    for port in webtests:
        url = "http://www.websocket.org/echo.html?location=ws://localhost:{}".format(port)
        webbrowser.open(url)
    print("Be sure to click 'Connect' on the webpages that just opened.")
    # Queue a simulated broadcast-to-all message
    def _alert_all(msg):
        print("Sending alert:", msg)
        msg_dict = {"alert": msg}
        cap_srv.broadcast_message(msg_dict)
        rnd_srv.broadcast_message(msg_dict)
        tim_srv.broadcast_message(msg_dict)
    loop.call_later(17, _alert_all, "ALL YOUR BASE ARE BELONG TO US")
    # Run event loop
    loop.run_forever()

class MyServer:
    def __init__(self, port):
        self.port = port  # type: int
        self.loop = None  # type: asyncio.AbstractEventLoop
        self.app = None  # type: web.Application
        self.srv = None  # type: asyncio.base_events.Server
    async def start(self):
        self.loop = asyncio.get_event_loop()
        self.app = web.Application()
        self.app["websockets"] = []  # type: [web.WebSocketResponse]
        self.app.router.add_get("/", self._websocket_handler)
        await self.app.startup()
        handler = self.app.make_handler()
        self.srv = await asyncio.get_event_loop().create_server(handler, port=self.port)
        print("{} listening on port {}".format(self.__class__.__name__, self.port))
    async def close(self):
        assert self.loop is asyncio.get_event_loop()
        self.srv.close()
        await self.srv.wait_closed()
        for ws in self.app["websockets"]:  # type: web.WebSocketResponse
            await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')
        await self.app.shutdown()
        await self.app.cleanup()
    async def _websocket_handler(self, request):
        assert self.loop is asyncio.get_event_loop()
        ws = web.WebSocketResponse()
        await ws.prepare(request)
        self.app["websockets"].append(ws)
        await self.do_websocket(ws)
        self.app["websockets"].remove(ws)
        return ws
    async def do_websocket(self, ws: web.WebSocketResponse):
        async for ws_msg in ws:  # type: aiohttp.WSMessage
            pass
    def broadcast_message(self, msg: dict):
        for ws in self.app["websockets"]:  # type: web.WebSocketResponse
            ws.send_json(msg)

class CapitalizeEchoServer(MyServer):
    """ Echoes back to client whatever they sent, but capitalized. """
    async def do_websocket(self, ws: web.WebSocketResponse):
        async for ws_msg in ws:  # type: aiohttp.WSMessage
            cap = ws_msg.data.upper()
            ws.send_str(cap)

class RandomQuoteServer(MyServer):
    """ Sends a random quote to the client every so many seconds. """
    QUOTES = ["Wherever you go, there you are.",
              "80% of all statistics are made up.",
              "If a tree falls in the woods, and no one is around to hear it, does it make a noise?"]
    def __init__(self, interval: float = 10, *kargs, **kwargs):
        super().__init__(*kargs, **kwargs)
        self.interval = interval
    async def do_websocket(self, ws: web.WebSocketResponse):
        async def _regular_interval():
            while self.srv.sockets is not None:
                quote = random.choice(RandomQuoteServer.QUOTES)
                ws.send_json({"quote": quote})
                await asyncio.sleep(self.interval)
        self.loop.create_task(_regular_interval())
        await super().do_websocket(ws)  # leave client connected here indefinitely

class TimeOfDayServer(MyServer):
    """ Sends a message to all clients simultaneously about time of day. """
    async def start(self):
        await super().start()
        async def _regular_interval():
            while self.srv.sockets is not None:
                if int(time.time()) % 10 == 0:  # Only on the 10 second mark
                    timestamp = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now())
                    self.broadcast_message({"timestamp": timestamp})
                await asyncio.sleep(1)
        self.loop.create_task(_regular_interval())

if __name__ == "__main__":
    main()

所以我只熟悉Socket。节点中的IO,但使用Socket水平扩展websocket相当容易。IO.

套接字可以与会话一起提供,因此每个会话都由特定的服务器管理。这样可以很容易地保存每个打开的套接字的状态,并在所有服务器之间实现负载平衡。

以下是适用于Python的SocketIO:

https://pypi.python.org/pypi/socketIO-client

以下是一篇关于如何将会话附加到redis存储的非常好的文章,以使其更快,并使跨服务器的负载平衡更易于管理。

如何与Socket共享会话。IO 1.x和Express 4.x?

我知道这并不能回答你关于aiohttp的问题,但希望这能让你更好地了解套接字的工作方式。

编辑:写入节点-

插座中。IO这真的很简单,它有很多功能可以以各种不同的方式广播消息。

例如,如果你想向每个聊天室的每个人发送一条消息。举个例子,每个有插座的人都可以轻松地写。

socket.broadcast.emit('WARNING', "this is a test");

假设你的房间是开放的,你可以通过一个名为.to()的简单功能只向房间里的人广播消息。示例我有一个名为"BBQ"的房间:

socket.broadcast.to('BBQ').emit('invitation', 'Come get some food!');

这将给BBQ频道的每个人发信息——来吃点东西吧!

编辑:编辑:

这是一篇关于Socket的精彩文章。IO有效,请确保您阅读了函数更新版本的第二个答案。它比他们的文档更容易阅读。

向除发送方(Socket.io(之外的所有客户端发送响应

据我所知,这也是python实现中的工作方式。为了便于使用,我当然会将其用于websocket。aiohttp看起来非常强大,但要么没有这个功能,隐藏在文档中,要么只在代码中编写,还没有任何文档。

更新(2017年2月(

Channels(幸运的是(没有合并到Django中。它可能仍然是一个伟大的项目,但它并不真正属于Django本身。

此外,我强烈建议大家看看Postgres对pub/sub的相对较新的内置支持。它可能会胜过其他任何东西,在aiohttp上构建一个自定义解决方案,使用Postgres作为支持服务,可能是你的最佳选择

原件

虽然不是aiohttp,但Django Channels很可能会合并到Django 1.10中,它以一种非常直观的方式解决了这个问题,它是由Django迁移的作者Andrew Godwin编写的。

Django Channels通过在Django应用程序前面创建路由层,抽象了"多服务器上的多进程"的概念。该路由层与后端(例如Redis(通信,以维护进程之间的可共享状态,并使用新的ASGI协议来方便处理HTTP请求和WebSockets,同时将每个请求委派给各自的"消费者"(例如,附带了HTTP请求的内置处理程序,您可以为WebSockets编写自己的处理程序(。

Django Channels有一个名为Groups的概念,它处理问题的"广播"性质;也就是说,它允许服务器上发生的事件向该组中的客户端触发消息,无论它们连接到相同还是不同的进程或服务器。

IMHO,Django Channels很可能被抽象成一个更通用的Python库。还有其他几个Python库可以实现类似Go的通道,但在撰写本文时,没有什么值得注意的可以提供网络透明度;通道在进程和服务器之间进行通信的能力。

相关内容

  • 没有找到相关文章

最新更新