如何使用pika和connexion初始化RabbitMQ使用者



我正试图建立一个Python微服务,通过RabbitMQ接收消息,同时为Kubernetes健康检查提供/healthREST端点。我将pika用于RabbitMQ使用者,将connexion用于REST端点。

但是,当我在main()中启动RabbitMQ使用者时,连接应用程序不会启动。

python-app.py

#!/usr/bin/env python
import pika, sys, os, connexion
from flask import Flask, request, jsonify
app = connexion.FlaskApp(__name__, specification_dir='./')
def main():
# Connection
...
# Exchange and queues
...
def callback(ch, method, properties, body):
...
channel.basic_consume(queue='pg-python', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages.')
channel.start_consuming()
app.run(port=8080, use_reloader=False)
@app.route('/api/v1/health', methods=['GET'])
def return_health():
message = {'status':'Healthy! <3'}
return jsonify(message)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

输出:

[*] Waiting for messages.

如何正确初始化这两个组件?我需要使用线程吗?

我已经通过在一个单独的线程中初始化RabbitMQ使用者来解决这个问题:

#!/usr/bin/env python
import pika, sys, os, threading
from flask import Flask, request, jsonify
app = Flask(__name__)
def start_rmq_connection():
# Connection
...
# Exchange and queues
...
def callback(ch, method, properties, body):
...
channel.basic_consume(queue='pg-python', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages.')
channel.start_consuming()
@app.route('/api/v1/health', methods=['GET'])
def return_health():
message = {'status':'Healthy! <3'}
return jsonify(message)
if __name__ == '__main__':
try:
thread_1 = threading.Thread(target=start_rmq_connection)
thread_1.start()
thread_1.join(0)
app.run()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

相关内容

  • 没有找到相关文章

最新更新