我有一个类,它包含一个函数,我希望能够通过调用flask resful端点来调用该函数。有没有一种方法可以定义一个异步函数来等待/订阅要调用的端点?如果需要,我也可以对flask应用程序进行更改(但不能切换到SocketIO(,或者编写某种异步请求函数。我只能使用基本的Anaconda 3.7库,并且我没有安装或可用的任何其他消息代理。
class DaemonProcess:
def __init__(self):
pass
async def await_signal():
signal = await http://ip123/signal
self.(process_signal) # do stuff with signal
就上下文而言,这不是过程的主要目标。我只是想能够使用它远程告诉我的流程,或者通过UI优雅地或强制地关闭工作流程。我唯一想到的另一个想法是反复对数据库表进行ping,看看是否插入了信号,但时间至关重要,在我看来,这需要以太短的间隔进行ping,异步方法会更受欢迎。数据库将是SQLite3,并且它似乎不支持update_hook回调。
以下是发送信号并进行处理的示例模式:
import asyncio
import aiotools
class DaemonProcess
async def process(reader, writer):
data = await reader.read(100)
writer.write(data)
print(f"We got a message {data} - time to do something about it.")
await writer.drain()
writer.close()
@aiotools.server
async def worker(loop, pidx, args):
server = await asyncio.start_server(echo, '127.0.0.1', 8888,
reuse_port=True, loop=loop)
print(f'[{pidx}] started')
yield # wait until terminated
server.close()
await server.wait_closed()
print(f'[{pidx}] terminated')
def start(self):
aiotools.start_server(myworker, num_workers=4)
if __name__ == '__main__':
# Run the above server using 4 worker processes.
d = DaemonProcess()
d.start()
如果您将其保存在文件中,例如process.py,您应该能够启动它:
python3 process.py
现在,一旦您在后台有了这个守护进程,您就应该能够ping它(请参阅下面的示例客户端(:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
现在,在您的Flask视图中,您应该能够调用:
asyncio.run(tcp_echo_client('I want my daemon to do something for me'))
请注意,所有这些都使用了localhost127.0.0.1
和端口8888
,所以除非您有自己的端口和IP,否则这些都是可用的,然后您需要相应地配置它们。
还要注意aiotools的使用,它是一个提供一组常见asyncio模式(守护进程等(的模块