我们都知道,使用 asyncio 可以大大提高套接字服务器的性能,显然,如果我们可以利用 CPU 中的所有内核(可能通过多处理模块或os.fork()
等),事情会变得更加出色。
我现在正在尝试构建一个多核套接字服务器演示,其中异步套接字服务器侦听每个内核并全部绑定到一个端口。只需创建一个异步服务器,然后使用os.fork()
,让进程竞争性地工作。
但是,当我尝试分叉时,单核精细代码遇到了一些麻烦。似乎在 epoll 选择器模块中从不同进程注册相同的文件描述符存在一些问题。
我在下面显示了一些代码,有人可以帮助我吗?
下面是使用 asyncio 的简单、逻辑清晰的 echo 服务器代码:
import os
import asyncio #,uvloop
from socket import *
# hendler sends back incoming message directly
async def handler(loop, client):
with client:
while True:
data = await loop.sock_recv(client, 64)
if not data:
break
await loop.sock_sendall(client, data)
# create tcp server
async def create_server(loop):
sock = socket(AF_INET ,SOCK_STREAM)
sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
sock.bind(('',25000))
sock.listen()
sock.setblocking(False)
return sock
# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
while True:
client ,addr = await loop.sock_accept(sock)
loop.create_task(handler(loop ,client))
loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()
在我尝试分叉之前,在套接字被抛出之后,在服务器开始服务之前,它工作正常。(此逻辑在同步 - 基于线程的代码中工作正常。
当我尝试这样做时:
loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
pid = os.fork()
if pid <= 0: # fork process as the same number as
break # my cpu cores
loop.create_task(serving(loop, sock))
loop.run_forever()
理论上分叉的过程是同一个套接字吗?并在相同的事件循环中运行?然后工作就好了?
但是我收到这些错误消息:
Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
key = self._selector.get_key(fd)
File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/test/temp1.py", line 23, in serving
client ,addr = await loop.sock_accept(sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
self._sock_accept(fut, False, sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
self.add_reader(fd, self._sock_accept, fut, True, sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
return self._add_reader(fd, callback, *args)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
(handle, None))
File "/usr/local/lib/python3.7/selectors.py", line 359, in register
self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists
蟒蛇版本 3.7.3,
我对正在发生的事情完全感到困惑。
有人可以帮忙吗?谢谢
根据跟踪器问题,不支持分叉现有的 asyncio 事件循环并尝试从多个进程中使用它。然而,根据 Yury 对同一问题的评论,多处理可以通过在开始循环之前分叉来实现,因此在每个子循环中运行完全独立的 asyncio 循环。
您的代码实际上证实了这种可能性:当create_server
async def
时,它不等待任何内容,也不使用loop
参数。因此,我们可以通过以下方式实现 Yury 的方法:使create_server
成为常规函数,删除loop
参数,并在os.fork()
之前调用它,并且仅在分叉后运行事件循环:
import os, asyncio, socket, multiprocessing
async def handler(loop, client):
with client:
while True:
data = await loop.sock_recv(client, 64)
if not data:
break
await loop.sock_sendall(client, data)
# create tcp server
def create_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 25000))
sock.listen()
sock.setblocking(False)
return sock
# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
while True:
client, addr = await loop.sock_accept(sock)
loop.create_task(handler(loop, client))
sock = create_server()
for num in range(multiprocessing.cpu_count() - 1):
pid = os.fork()
if pid <= 0: # fork process as the same number as
break # my cpu cores
loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()