python 3:如何在使用async任务时等待async函数中的回调



AWS IoT python Sdk有一个函数publishAsync
该函数的签名是publishAsync(主题、有效负载、QoS、ackCallback=None(

我想将其扭曲为协同程序并使用asyncio。

我想做的是:

  1. 连接到AWS
  2. 异步发布10条消息
  3. 断开

我不知道该如何将该函数包装为异步函数。

async def asyncPublish(self,
msg:str,
topic:str,
QoS=1):
# the publishAckFn
def internalPubAckHandler(mid):
print(json.dumps({
'messageID':mid,
'acknowledged':True
}))
return True
pass
# publish to the topic asynchronously
messageID = self.awsIoTClient.publishAsync(topic,msg,QoS,ackCallback=internalPubAckHandler)
print(json.dumps({
'messageID':messageID,
'topic':topic,
'payload':msg,
'QoS':QoS,
'async':True
}))
pass

---- in my main file
tasks = []
for i in range(10):
tasks += asyncPublish('test','test')
pass
loop = asyncio.get_event_loop()
loop.set_debug(True)  # debug mode
gw1.connect()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
gw1.disconnect()  

但当我这样做时,断开连接将比internalPubAckHandler更快地被调用我不会得到任何承认。

最后,我找到了解决方案。我们可以使用期货。

Future对象用于将基于低级回调的代码与高级异步/等待代码桥接。参考

如果函数的签名是这样的:

def functionWithCallback(args,callback)

并且我们需要将其封装到等待回调的异步函数中,我们可以执行以下操作:

async def asyncVersion(args):
# one way of creating a future
loop = asyncio.get_running_loop()
future = loop.create_future()
def futureCallback():
# do something here and then signal the future
...
loop.call_soon_threadsafe(future .set_result,time.time())
pass
# call your function
functionWithCallback(args,futureCallback)
# wait for the callback
await future

# do other stuff after the callback is done
.....
pass

最新更新