Python 3.4 通信流委托 - 非阻塞 recv 和发送 - 数据从 asyncio 传出



我正在RPi上组装一个客户端服务器应用程序。 它有一个主线程,可以创建一个通信线程来与iOS设备通信。 主线程创建一个 asyncio 事件循环和一个 sendQ 和一个 recvQ,并将它们作为参数传递给通信线程中的 commsDelegate main 方法。

我遇到的麻烦是当iOS设备连接时,它需要在数据可用时立即从此Python应用程序接收未经请求的数据,并且它需要能够将数据发送到Python应用程序。 因此,发送和接收必须是非阻塞的。

那里有很棒的回显服务器教程。 但就服务器对数据做一些有用的事情而言,很少。

任何人都可以帮助我让 asyncio 读取我的发送队列并在主线程排队后立即转发数据? 我收到了很棒的工作。

主线程创建一个循环并启动通信线程:

commsLoop = asyncio.new_event_loop()
commsMainThread = threading.Thread(target=CommsDelegate.commsDelegate, args=(commsInQ,commsOutQ,commsLoop,commsPort,), daemon=True)
commsMainThread.start()

然后,模块 CommsDelegate 中的 asyncio 应该将循环作为 loop.run_forever() 服务器任务从套接字流读取和写入,并使用队列发送接收消息返回到主线程。

这是我到目前为止的代码。 我发现如果我为协议生成器创建一个工厂,我可以将队列名称传递给它,这样消息的接收现在都很好。 当它们从客户端到达时,它们被排队_nowait并且主线程可以很好地接收它们。

我只需要 asyncio 来处理来自主线程的出站消息队列,因为它们到达 sendQ,这样它就可以将它们发送到连接的客户端。

#!/usr/bin/env python3.6
import asyncio
class ServerProtocol(asyncio.Protocol):
def __init__(self, loop, recvQ, sendQ):
self.loop = loop
self.recvQ = recvQ
self.sendQ = sendQ
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
self.recvQ.put_nowait(message.rstrip())
# Needs work... I think the queue.get_nowait should be a co-ro maybe? 
def unknownAtTheMo():
dataToSend = sendQ.get_nowait()
print('Send: {!r}'.format(message))
self.transport.write(dataToSend)
# Needs work to close on request from client or server or exc...
def handleCloseSocket(self):
print('Close the client socket')
self.transport.close()
# async co-routine to consume the send message Q from Main Thread
async def consume(sendQ):
print("In consume coro")
while True:
outboundData = await self.sendQ.get()
print("Consumed", outboundData)
self.transport.write(outboundData.encode('ascii'))
def commsDelegate(recvQ, sendQ, loop, port):
asyncio.set_event_loop(loop)
# Connection coroutine - Create a factory to assist the protocol in receipt of the queues as args
factory = lambda: ProveItServerProtocol(loop, recvQ, sendQ)
# Each client connection will create a new protocol instance
connection = loop.run_until_complete(loop.create_server(factory, host='192.168.1.199', port=port))
# Outgoing message queue handler
consumer = asyncio.ensure_future(consume(sendQ))
# Set up connection
loop.run_until_complete(connection)
# Wait until the connection is closed
loop.run_forever()
# Wait until the queue is empty
loop.run_until_complete(queue.join())
# Cancel the consumer
consumer.cancel()
# Let the consumer terminate
loop.run_until_complete(consumer)
# Close the connection
connection.close()
# Close the loop
loop.close()

我将所有数据消息作为json发送,CommsDelegate执行编码和解码,然后将它们中继为asis。

更新:异步线程似乎对传入流量运行良好。 服务器接收 json 并通过队列中继它 - 非阻塞。

发送工作后,我将在线程上拥有一个可重用的黑盒服务器。

我可以看到你的方法有两个问题。首先,所有客户端都使用相同的recvsend队列,因此consume协程无法知道要回复谁。

第二个问题与您使用队列作为同步和异步世界之间的桥梁有关。请参阅代码的这一部分:

await self.sendQ.get()

如果sendQ是常规队列(来自queue模块),则此行将失败,因为sendQ不是协程。另一方面,如果sendQ是一个asyncio.Queue,主线程将无法使用sendQ.put,因为它是一个协程。可以使用put_nowait,但在 asyncio 中不能保证线程安全。相反,您必须使用loop.call_soon_threadsafe:

loop.call_soon_threadsafe(sendQ.put_nowait, message)

通常,请记住 asyncio 被设计为作为主应用程序运行。它应该在主线程中运行,并通过ThreadPoolExecutor与同步代码进行通信(请参阅loop.run_in_executor)。

有关多线程的更多信息,请参阅 asyncio 文档。您可能还想看看 asyncio stream API,它提供了一个更好的接口来使用 TCP。

最新更新