如何为 NATS 客户端使用 Python Tornado 设置延迟、非阻塞自停止



我试图让龙卷风在超时期后停止,而不会阻止任何现有功能。我可能错过了龙卷风约定,但无论我使用 spawn_callback、任务还是线程,我似乎都在阻止主循环。

首先,我这样做的原因是,我想在客户端应用程序中使用世界著名的 NATS 消息总线来发布消息(而不是通常的直接 HTTP 功能(,然后等待订阅的响应。异步行为的典型问题,官方的NATS Python客户端使用Tornado,所以我也在尝试使用它。

我怀疑我的问题与理解tornado.gen.coroutine装饰器如何与线程一起工作有关。

下面是我的代码片段。如果有人注意到我的明显问题,我将不胜感激。谢谢!

class Delayed(Thread):
def __init__(self, callback=None, timeout=2, *args, **kwargs):
super(Delayed, self).__init__(*args, **kwargs)
self.callback = callback
self.timeout = timeout
def run(self):
time.sleep(self.timeout)
if self.callback != None:
self.callback()
def timeout_task(timeout_secs=2):
time.sleep(timeout_secs)
ioloop.IOLoop.instance().stop()
yield
@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
parser.add_argument('CommandType')
...
parser.add_argument('-s', '--servers', default=[], action='append')
args = parser.parse_args()
try:
timeout=args.wait
servers = args.servers
queue = ""
...
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
data = funct_to_get_data()
nc = NATS()
opts = { "servers": servers }
yield nc.connect(**opts)
def self_stop():
ioloop.IOLoop.instance().stop()
def handler(msg):
print("[Received: {0}] {1}".format(msg.subject, msg.data))
print("Subscribed to '{0}'".format(subject))
future = nc.subscribe(subject, queue, handler)
sid = future.result()
yield nc.publish(subject, data)
yield nc.flush()
print("Published to '{0}'".format(subject))
# HERE is where I'd like to setup a non-blocking timeout that
# will stop Tornado.
# spawn_callback blocks and prevents future from receiving anything.
#ioloop.IOLoop.current().spawn_callback(lambda: timeout_task(timeout))
# Task blocks and prevents future from receiving anything. 
yield tornado.gen.Task(timeout_task, timeout)
# Straight attempt at a Thread will block as well.
Delayed(self_stop(), timeout).start()
except Exception, e:
print(e)
show_usage_and_die()
if __name__ == '__main__':
main()
ioloop.IOLoop.instance().start()

好的,我最终只使用了线程。计时器 Python 支持延迟函数调用。这是重要的代码。这可能不是很好的Python或使用Tornado,但我现在要使用它。

def self_stop():
ioloop.IOLoop.instance().stop()
@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
...
parser.add_argument('-w', '--wait', default=2, type=int)
...
args = parser.parse_args()
try:
timeout=args.wait
t = Timer(timeout, self_stop)
t.start()
...

if __name__ == '__main__':
main()
ioloop.IOLoop.instance().start()

最新更新