到Tornado服务器的多个异步HTTP连接



我有一个龙卷风服务器,我正在尝试使其同步。我有一个客户端,它可以同时向服务器发出异步请求。它每5秒用心跳对服务器进行一次ping,其次,它会在可能的时候对作业发出GET请求。

在服务器端,有一个线程安全队列,其中包含作业。如果队列为空,它将阻塞20秒。我希望它保持连接并阻止20秒,当它返回时,它会向客户端写入"No job"。一旦作业可用,它就应该立即将其写入客户端,因为queue.get((会返回。我希望在该请求被阻止时,心跳继续在后台发生。在这里,我从同一个客户端向服务器发出两个异步请求。

这是我构建的一个示例项目,它在某种程度上模拟了我的问题。

服务器:

import tornado.ioloop
import tornado.web
from queue import Queue
from tornado import gen
q = Queue()

class HeartBeatHandler(tornado.web.RequestHandler):
@gen.coroutine
def post(self):
print("Heart beat")

class JobHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
print("Job")
try:
job = yield q.get(block=True, timeout=20)
self.write(job)
except Exception as e:
self.write("No job")

def make_app():
return tornado.web.Application([
(r"/heartbeat", HeartBeatHandler),
(r"/job", JobHandler),
])

if __name__ == "__main__":
app = make_app()
app.listen(8888)
try:
tornado.ioloop.IOLoop.current().start()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.current().stop()

客户:

import asyncio
from tornado import httpclient, gen

@gen.coroutine
def heartbeat_routine():
while True:
http_client = httpclient.AsyncHTTPClient()
heartbeat_request = httpclient.HTTPRequest("http://{}/heartbeat".format("127.0.0.1:8888"), method="POST",
body="")
try:
yield http_client.fetch(heartbeat_request)
yield asyncio.sleep(5)
except httpclient.HTTPError as e:
print("Heartbeat failed!nError: {}".format(str(e)))
http_client.close()

@gen.coroutine
def worker_routine():
while True:
http_client = httpclient.AsyncHTTPClient(defaults=dict(request_timeout=180))
job_request = httpclient.HTTPRequest("http://{}/job".format("127.0.0.1:8888"), method="GET")
try:
response = yield http_client.fetch(job_request)
print(response.body)
except httpclient.HTTPError as e:
print("Heartbeat failed!nError: {}".format(str(e)))
http_client.close()

if __name__ == "__main__":
loop = asyncio.get_event_loop()
asyncio.ensure_future(heartbeat_routine())
asyncio.ensure_future(worker_routine())
loop.run_forever()

问题:

  1. 问题是心跳也会在这20秒内停止而queue.get((阻塞。我不想要
  2. 正如您在我的客户端中看到的,我将请求超时设置为180秒。但是龙卷风似乎从来都不起作用。如果将queue.get((timeout增加到20以上秒,则返回错误代码,说明请求超时
  1. 如果使用线程安全队列,则必须使用IOLoop线程的非阻塞操作。相反,在线程池中运行它们:

    job = yield IOLoop.current().run_in_executor(None, lambda: q.get(block=True, timeout=20))
    

    或者,您可以使用Tornado的异步(但线程不安全(队列,并在需要与另一个线程的队列交互时使用IOLoop.add_callback

  2. AsyncHTTPClient构造函数有一些魔力,它试图在可能的情况下共享现有实例,但这意味着构造函数参数只有在第一次有效。worker_routine正在拾取由heartbeat_routine创建的默认实例。添加force_instance=True以确保在worker_routine中获得新的客户端(完成后调用.close()(

最新更新