如何从asyncio.open_connection并行使用读写器



如何从asyncio.open_connection提供的异步reader, writer对中并行读写?

我在两个不同的循环上尝试了asyncio.open_connection,比如:

async def read():
reader, writer = await asyncio.open_connection('127.0.0.1', 5454)
while True:
readval = await reader.readline()
print(f"read {readval}")
async def write():
reader, writer = await asyncio.open_connection('127.0.0.1', 5454)
while True:
self.wsem.acquire()
msg = self.wq.popleft()
print("Writing " + msg)
writer.write(msg.encode())
await writer.drain()

threading.Thread(target=lambda: asyncio.run(write())).start()
threading.Thread(target=lambda: asyncio.run(read())).start()

但似乎有时写线程会耗尽读线程中的读内容,而且工作不好。

然后我尝试在两个循环之间共享读取器和写入器,但它抛出了一个异常

Exception in thread Thread-8:
Traceback (most recent call last):
File "C:UsersLenovoAppDataLocalProgramsPythonPython39libthreading.py", line 954, in _bootstrap_inner
self.run()
File "C:UsersLenovoAppDataLocalProgramsPythonPython39libthreading.py", line 892, in run
self._target(*self._args, **self._kwargs)
File "C:UsersLenovoPycharmProjectstestape-adb-adapteradapterdevice_socket.py", line 89, in <lambda>
threading.Thread(target=lambda: asyncio.run(read())).start()
File "C:UsersLenovoAppDataLocalProgramsPythonPython39libasynciorunners.py", line 44, in run
return loop.run_until_complete(main)
File "C:UsersLenovoAppDataLocalProgramsPythonPython39libasynciobase_events.py", line 642, in run_until_complete
return future.result()
File "C:UsersLenovoPycharmProjectstestape-adb-adapteradapterdevice_socket.py", line 72, in read
readval = await self.reader.readline()
File "C:UsersLenovoAppDataLocalProgramsPythonPython39libasynciostreams.py", line 540, in readline
line = await self.readuntil(sep)
File "C:UsersLenovoAppDataLocalProgramsPythonPython39libasynciostreams.py", line 632, in readuntil
await self._wait_for_data('readuntil')
File "C:UsersLenovoAppDataLocalProgramsPythonPython39libasynciostreams.py", line 517, in _wait_for_data
await self._waiter
RuntimeError: Task <Task pending name='Task-2' coro=<DeviceSocket.connect.<locals>.read() running at C:UsersLenovoPycharmProjectstestape-adb-adapteradapterdevice_socket.py:72> cb=[_run_until_complete_cb() at C:UsersLenovoAppDataLocalProgramsPythonPython39libasynciobase_events.py:184]> got Future <Future pending> attached to a different loop
async def read():
connection_opened.wait()
while True:
print(f"reading")
if self.reader.at_eof():
continue
readval = await self.reader.readline()
print(f"read {readval}")
self.rq.append(readval.decode())
self.rsem.release(1)
async def write():
self.reader, self.writer = await asyncio.open_connection('127.0.0.1', 5454)
connection_opened.set()
while True:
self.wsem.acquire()
msg = self.wq.popleft()
print("Writing " + msg)
self.writer.write(msg.encode())
await self.writer.drain()
connection_opened = threading.Event()
threading.Thread(target=lambda: asyncio.run(write())).start()
threading.Thread(target=lambda: asyncio.run(read())).start()

我认为这应该是一个简单且相当常见的用例。正确的方法是什么?

我建议您将线程函数更改为以下内容:

t = self.loop.create_task(self.write)

并以结尾

loop.run_until_complete(t)

因为我缺少self.wqself.wsem函数,也不确定它们的含义,所以我无法重现错误消息。希望这能帮你解决问题。

最新更新