将来包装队列



我正在Python 3.7中编写一个Tornado Web服务器,以显示multiprocessing库运行的进程的状态。

以下代码有效,但我希望能够使用Tornado的内置库来完成,而不是在线程库中进行黑客攻击。我还不知道如何在queue.get期间不阻止龙卷风。我认为正确的解决方案是在将来封装get调用。我试了好几个小时了,但还没想好怎么做。

我的多处理脚本内部:

class ProcessToMonitor(multiprocessing.Process)
def __init__(self):
multiprocessing.Process.__init__(self)
self.queue = multiprocessing.Queue()
def run():
while True:
# do stuff
self.queue.put(value)

然后,在我的龙卷风脚本中

class MyWebSocket(tornado.websocket.WebSocketHandler):
connections = set()
def open(self):
self.connections.add(self)
def close(self):
self.connections.remove(self)
@classmethod
def emit(self, message):
[client.write_message(message) for client in self.connections]
def worker():
ptm = ProcessToMonitor()
ptm.start()
while True:
message = ptm.queue.get()
MyWebSocket.emit(message)
if __name__ == '__main__':
app = tornado.web.Application([
(r'/', MainHandler), # Not shown
(r'/websocket', MyWebSocket)
])
app.listen(8888)
threading.Thread(target=worker)
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()

queue.get不是一个阻塞函数,它只是等待队列中有项目,以防队列为空。我可以从您的代码中看到,queue.get非常适合while循环中的用例。

我想你可能用错了。您必须使worker函数成为协程(async/await语法(:

async def worker():
...
while True:
message = await queue.get()
...

但是,如果您不想等待项目并希望立即进行,则其替代方案是queue.get_nowait

这里需要注意的一点是,如果队列为空,queue.get_nowait将引发一个名为QueueEmpty的异常。因此,您需要处理该异常。

示例:

while True:
try:
message = queue.get_nowait()
except QueueEmpty:
# wait for some time before
# next iteration
# otherwise this loop will
# keep running for no reason
MyWebSocket.emit(message)

正如您所看到的,如果队列为空,您将不得不使用暂停while循环一段时间,以防止它淹没系统。

那么,为什么不首先使用queue.get呢?

相关内容

  • 没有找到相关文章

最新更新