将外部队列传递到烧瓶路由函数



我正在尝试修改flask请求回调,以便它可以在执行回调时与其他代码通信。这个例子更好地解释了这一点:

from flask import Flask, request
from queue import Queue
flask_input_queue = Queue()
flask_output_queue = Queue()
app = Flask(__name__)

@app.route("/voice", methods=['GET', 'POST'])
def voice():
# The receiver on the other end gets notified we got a request
flask_output_queue.put(str(request))
# This blocks until the external party responds with something
response = flask_input_queue.get()
# But how do the queues end up in the function scope to begin with?
return response

app.run(debug=True)

在这里,外部代码将有一个使用队列进入web服务器的通道。这使我能够在代码的另一部分完全抽象web服务器的概念。

然而,为此,我需要能够以URL以外的方式将信息传递给回调方法。坦率地说,它不一定是一个队列,其他IPC机制也可以正常工作,但它们都依赖于有一种方法将数据传递到回调中。

有办法在烧瓶里这样做吗?

_URLCallbackClassadd_url_rule组合使用,而不是装饰器。_URLCallbackClass获取队列作为实例属性。假设实际的回调函数是_URLCallbackClass的方法,我们将队列导入回调函数。

其余的复杂性只是从提供一个工作示例中产生的。

logging.basicConfig(format='[Thread: %(threadName)s-%(thread)d] %(message)s', level=logging.INFO)                                                                                                             [0/0]
logger = logging.getLogger(__name__)

class ControllableServer(threading.Thread):
class _URLCallbackClass():
def __init__(self, input_queue, output_queue):
self.input_queue = input_queue
self.output_queue = output_queue
def url_callback(self):
self.output_queue.put("[URL callback] I just got called")
response_from_the_queue = self.input_queue.get()
return Response(response_from_the_queue, 200)
def __init__(self, input_queue, output_queue):
super().__init__(daemon=True)
self.input_queue = input_queue
self.output_queue = output_queue
self._flask = Flask(__name__)
def run(self):
callback_class = ControllableServer._URLCallbackClass(self.input_queue, self.output_queue)
self._flask.add_url_rule('/endpoint', 'url_callback', callback_class.url_callback)
logger.info(f"Starting flask")              
self._flask.run()                           

def call_URL_in_separate_thread(url):               
def call_URL(url):                              
logger.info(f"Calling {url}")               
response = requests.get(url)                
logger.info(f"Got response: {response.text}")                                                    
return response.text                        
url_caller_thread = threading.Thread(target=call_URL, args=(url,))                                   
url_caller_thread.start()                       

if __name__ == "__main__":                          
flask_input_queue = Queue()                     
flask_output_queue = Queue()                    
controllable_server = ControllableServer(flask_input_queue, flask_output_queue)                      
controllable_server.start()                     
call_URL_in_separate_thread("http://127.0.0.1:5000/endpoint")                                        
message_from_within_the_callback = flask_output_queue.get()                                          
logger.info(f"Got message from queue: {message_from_within_the_callback}")                           
message_to_the_callback = "I come from the outside !@##$@"                                           
flask_input_queue.put(message_to_the_callback)  
logger.info(f"Sending message to queue: {message_to_the_callback}")                                  

输出:

[Thread: Thread-1-140465413375744] Starting flask
[Thread: Thread-2-140465404983040] Calling http://127.0.0.1:5000/endpoint
* Serving Flask app "basic_flask_passing_variable" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
[Thread: Thread-1-140465413375744]  * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
[Thread: MainThread-140465450415936] Got message from queue: [URL callback] I just got called
[Thread: MainThread-140465450415936] Sending message to queue: I come from the outside !@##$@
[Thread: Thread-3-140465396041472] 127.0.0.1 - - [03/Mar/2020 18:33:32] "GET /endpoint HTTP/1.1" 200 -
[Thread: Thread-2-140465404983040] Got response: I come from the outside !@##$@

最新更新