aiohttp内置了对websocket的支持。它非常简单,效果很好。
文档中示例的简化版本是:
async def handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# Async iterate the messages the client sends
async for message in ws:
ws.send_str('You sent: %s' % (message.data,))
print('websocket connection closed')
在该示例中,ws
是对与客户端的websocket连接的引用。我可以很容易地将这些引用放在request.app
中,就像@Crandel在这里所做的那样(即全局状态(,但不能放在生产应用程序中,因为每个应用程序服务器(甚至每个工作程序(都有自己的app
实例。
这方面有公认的模式吗?还有别的办法吗?
注意:我指的不是会话。我指的是关系。当服务器B等中的应用程序代码中发生事件时,我想向连接到服务器a的客户端发送一条消息。
如果我理解正确,您希望有多个websocket服务器,每个服务器都连接了多个客户端,但您希望能够与所有连接的客户端进行潜在的通信。
这里有一个例子,它创建了三个琐碎的服务器——一个大写回显、一个随机报价和一天中的时间——然后向所有连接的客户端发送一条广播消息。也许这里面有一些有用的想法
粘贴框:https://pastebin.com/xDSACmdV
#!/usr/bin/env python3
"""
Illustrates how to have multiple websocket servers running and send
messages to all their various clients at once.
In response to stackoverflow question:
https://stackoverflow.com/questions/35820782/how-to-manage-websockets-across-multiple-servers-workers
Pastebin: https://pastebin.com/xDSACmdV
"""
import asyncio
import datetime
import random
import time
import webbrowser
import aiohttp
from aiohttp import web
__author__ = "Robert Harder"
__email__ = "rob@iharder.net"
__license__ = "Public Domain"
def main():
# Create servers
cap_srv = CapitalizeEchoServer(port=9990)
rnd_srv = RandomQuoteServer(port=9991)
tim_srv = TimeOfDayServer(port=9992)
# Queue their start operation
loop = asyncio.get_event_loop()
loop.create_task(cap_srv.start())
loop.create_task(rnd_srv.start())
loop.create_task(tim_srv.start())
# Open web pages to test them
webtests = [9990, 9991, 9991, 9992, 9992]
for port in webtests:
url = "http://www.websocket.org/echo.html?location=ws://localhost:{}".format(port)
webbrowser.open(url)
print("Be sure to click 'Connect' on the webpages that just opened.")
# Queue a simulated broadcast-to-all message
def _alert_all(msg):
print("Sending alert:", msg)
msg_dict = {"alert": msg}
cap_srv.broadcast_message(msg_dict)
rnd_srv.broadcast_message(msg_dict)
tim_srv.broadcast_message(msg_dict)
loop.call_later(17, _alert_all, "ALL YOUR BASE ARE BELONG TO US")
# Run event loop
loop.run_forever()
class MyServer:
def __init__(self, port):
self.port = port # type: int
self.loop = None # type: asyncio.AbstractEventLoop
self.app = None # type: web.Application
self.srv = None # type: asyncio.base_events.Server
async def start(self):
self.loop = asyncio.get_event_loop()
self.app = web.Application()
self.app["websockets"] = [] # type: [web.WebSocketResponse]
self.app.router.add_get("/", self._websocket_handler)
await self.app.startup()
handler = self.app.make_handler()
self.srv = await asyncio.get_event_loop().create_server(handler, port=self.port)
print("{} listening on port {}".format(self.__class__.__name__, self.port))
async def close(self):
assert self.loop is asyncio.get_event_loop()
self.srv.close()
await self.srv.wait_closed()
for ws in self.app["websockets"]: # type: web.WebSocketResponse
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')
await self.app.shutdown()
await self.app.cleanup()
async def _websocket_handler(self, request):
assert self.loop is asyncio.get_event_loop()
ws = web.WebSocketResponse()
await ws.prepare(request)
self.app["websockets"].append(ws)
await self.do_websocket(ws)
self.app["websockets"].remove(ws)
return ws
async def do_websocket(self, ws: web.WebSocketResponse):
async for ws_msg in ws: # type: aiohttp.WSMessage
pass
def broadcast_message(self, msg: dict):
for ws in self.app["websockets"]: # type: web.WebSocketResponse
ws.send_json(msg)
class CapitalizeEchoServer(MyServer):
""" Echoes back to client whatever they sent, but capitalized. """
async def do_websocket(self, ws: web.WebSocketResponse):
async for ws_msg in ws: # type: aiohttp.WSMessage
cap = ws_msg.data.upper()
ws.send_str(cap)
class RandomQuoteServer(MyServer):
""" Sends a random quote to the client every so many seconds. """
QUOTES = ["Wherever you go, there you are.",
"80% of all statistics are made up.",
"If a tree falls in the woods, and no one is around to hear it, does it make a noise?"]
def __init__(self, interval: float = 10, *kargs, **kwargs):
super().__init__(*kargs, **kwargs)
self.interval = interval
async def do_websocket(self, ws: web.WebSocketResponse):
async def _regular_interval():
while self.srv.sockets is not None:
quote = random.choice(RandomQuoteServer.QUOTES)
ws.send_json({"quote": quote})
await asyncio.sleep(self.interval)
self.loop.create_task(_regular_interval())
await super().do_websocket(ws) # leave client connected here indefinitely
class TimeOfDayServer(MyServer):
""" Sends a message to all clients simultaneously about time of day. """
async def start(self):
await super().start()
async def _regular_interval():
while self.srv.sockets is not None:
if int(time.time()) % 10 == 0: # Only on the 10 second mark
timestamp = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now())
self.broadcast_message({"timestamp": timestamp})
await asyncio.sleep(1)
self.loop.create_task(_regular_interval())
if __name__ == "__main__":
main()
所以我只熟悉Socket。节点中的IO,但使用Socket水平扩展websocket相当容易。IO.
套接字可以与会话一起提供,因此每个会话都由特定的服务器管理。这样可以很容易地保存每个打开的套接字的状态,并在所有服务器之间实现负载平衡。
以下是适用于Python的SocketIO:
https://pypi.python.org/pypi/socketIO-client
以下是一篇关于如何将会话附加到redis存储的非常好的文章,以使其更快,并使跨服务器的负载平衡更易于管理。
如何与Socket共享会话。IO 1.x和Express 4.x?
我知道这并不能回答你关于aiohttp的问题,但希望这能让你更好地了解套接字的工作方式。
编辑:写入节点-
插座中。IO这真的很简单,它有很多功能可以以各种不同的方式广播消息。
例如,如果你想向每个聊天室的每个人发送一条消息。举个例子,每个有插座的人都可以轻松地写。
socket.broadcast.emit('WARNING', "this is a test");
假设你的房间是开放的,你可以通过一个名为.to()
的简单功能只向房间里的人广播消息。示例我有一个名为"BBQ"的房间:
socket.broadcast.to('BBQ').emit('invitation', 'Come get some food!');
这将给BBQ频道的每个人发信息——来吃点东西吧!
编辑:编辑:
这是一篇关于Socket的精彩文章。IO有效,请确保您阅读了函数更新版本的第二个答案。它比他们的文档更容易阅读。
向除发送方(Socket.io(之外的所有客户端发送响应
据我所知,这也是python实现中的工作方式。为了便于使用,我当然会将其用于websocket。aiohttp看起来非常强大,但要么没有这个功能,隐藏在文档中,要么只在代码中编写,还没有任何文档。
更新(2017年2月(
Channels(幸运的是(没有合并到Django中。它可能仍然是一个伟大的项目,但它并不真正属于Django本身。
此外,我强烈建议大家看看Postgres对pub/sub的相对较新的内置支持。它可能会胜过其他任何东西,在aiohttp上构建一个自定义解决方案,使用Postgres作为支持服务,可能是你的最佳选择
原件
虽然不是aiohttp,但Django Channels很可能会合并到Django 1.10中,它以一种非常直观的方式解决了这个问题,它是由Django迁移的作者Andrew Godwin编写的。
Django Channels通过在Django应用程序前面创建路由层,抽象了"多服务器上的多进程"的概念。该路由层与后端(例如Redis(通信,以维护进程之间的可共享状态,并使用新的ASGI协议来方便处理HTTP请求和WebSockets,同时将每个请求委派给各自的"消费者"(例如,附带了HTTP请求的内置处理程序,您可以为WebSockets编写自己的处理程序(。
Django Channels有一个名为Groups的概念,它处理问题的"广播"性质;也就是说,它允许服务器上发生的事件向该组中的客户端触发消息,无论它们连接到相同还是不同的进程或服务器。
IMHO,Django Channels很可能被抽象成一个更通用的Python库。还有其他几个Python库可以实现类似Go的通道,但在撰写本文时,没有什么值得注意的可以提供网络透明度;通道在进程和服务器之间进行通信的能力。