我正在尝试在Flask服务器中实现异步RPC客户端。的想法是,每个请求生成一个线程一个uuid,每个请求都是要等到有反应RpcClient
queue
属性对象正确的uuid
。
问题是两个请求中有一个失败。我认为这可能是多线程的问题,但我不知道它是从哪里来的。Bug可以在这里看到
使用调试打印,似乎在_on_response
回调中接收到具有正确uuid的消息,并在此实例中正确更新queue
属性,但/rpc_call/<payload>
端点内的queue
属性不同步(因此queue[uuid]
在RpcClient
回调中具有response
的值,但在端点范围内仍然具有None
)。
我代码:
from flask import Flask, jsonif
from gevent.pywsgi import WSGIServer
import sys
import os
import pika
import uuid
import time
import threading
class RpcClient(object):
"""Asynchronous Rpc client."""
internal_lock = threading.Lock()
queue = {}
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='rabbitmq'))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
self.channel.exchange_declare(exchange='kaldi_expe', exchange_type='topic')
# Create all the queue and bind them to the corresponding routing key
self.channel.queue_declare('request', durable=True)
result = self.channel.queue_declare('answer', durable=True)
self.channel.queue_bind(exchange='kaldi_expe', queue='request', routing_key='kaldi_expe.web.request')
self.channel.queue_bind(exchange='kaldi_expe', queue='answer', routing_key='kaldi_expe.kaldi.answer')
self.callback_queue = result.method.queue
.
thread = threading.Thread(target=self._process_data_events)
thread.setDaemon(True)
thread.start()
def _process_data_events(self):
self.channel.basic_consume(self.callback_queue, self._on_response, auto_ack=True)
while True:
with self.internal_lock:
self.connection.process_data_events()
time.sleep(0.1)
def _on_response(self, ch, method, props, body):
"""On response we simply store the result in a local dictionary."""
self.queue[props.correlation_id] = body
def send_request(self, payload):
corr_id = str(uuid.uuid4())
self.queue[corr_id] = None
with self.internal_lock:
self.channel.basic_publish(exchange='kaldi_expe',
routing_key="kaldi_expe.web.request",
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=corr_id,
),
body=payload)
return corr_id
def flask_app():
app = Flask("kaldi")
@app.route('/', methods=['GET'])
def server_is_up():
return 'server is up', 200
@app.route('/rpc_call/<payload>')
def rpc_call(payload):
"""Simple Flask implementation for making asynchronous Rpc calls. """
corr_id = app.config['RPCclient'].send_request(payload)
while app.config['RPCclient'].queue[corr_id] is None:
#print("queue server: " + str(app.config['RPCclient'].queue))
time.sleep(0.1)
return app.config['RPCclient'].queue[corr_id]
if __name__ == '__main__':
while True:
try:
rpcClient = RpcClient()
app = flask_app()
app.config['RPCclient'] = rpcClient
print("Rabbit MQ is connected, starting server", file=sys.stderr)
app.run(debug=True, threaded=True, host='0.0.0.0')
except pika.exceptions.AMQPConnectionError as e:
print("Waiting for RabbitMq startup" + str(e), file=sys.stderr)
time.sleep(1)
except Exception as e:
worker.log.error(e)
exit(e)
我找到bug的来源了:
app.run(debug=True, threaded=True, host='0.0.0.0')
行的debug=True
从开头重新启动服务器。
然后从头开始重新启动整个脚本。因此,另一个rpclient被初始化并从同一队列中消费。问题是前一个线程也在运行。这会导致两个rpclient从同一个线程中消费,其中一个实际上是无用的。