上下文:
我在视图命令中有一个函数,在表单上发布HTTP后,通过后台任务脚本发送一些varaibale。这个后台脚本处理了很多API调用,并将其转换为要转换的JSON。因为在呈现下一个html页面之前,这可能需要很长时间(因为API调用的方式(。我决定将这一个作为带线程的后台任务。最终,如果可能的话,我希望稍后添加一个que。
问题:
但问题是,尽管我清楚地将其设置为守护进程线程和所有内容。代码未被执行。我的Python控制台日志甚至没有显示正在执行的代码,所以我在这里做错了什么。
如果您想要后台任务本身,请查看订单66。
"""
Routes and views for the flask application.
"""
from datetime import datetime
from flask import render_template, jsonify
from FlaskWebPODTracer import app
import json
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from flask import request, redirect, url_for, flash
# The main script that has to happen in the background.
import classified.background_script.backgroundtask as Order66
from threading import Thread
import random
import string
from celery import Celery
from flask_mail import Mail, Message
import requests
API_VERSION = "v1/"
Succes_message = "U ontvangt nog een mail wanneer de data klaar is. Bedankt om PODTracer te gebruiken."
Fail_message_1 = "Wijzig de parameters van uw aanvraag en probeer opnieuw."
Fail_message_2 = "Blijft dit voorkomen contacteer dan support."
Fail_message = Fail_message_1 + 'n' + Fail_message_2
@app.route('/handle_data/')
@app.route('/handle_data/', methods=['POST', 'GET'])
def handle_data():
#if request.method == 'GET':
# return render_template(
# "request-completed.html",
# title=
# "Aanvraag ingediend",
# objects = jsonobjects,
# year=datetime.now().year,
# message='Uw aanvraag is voltooid.'
# )
if request.method == 'POST':
email = request.form['inputEmail']
stringpart = request.form['city']
sper_year = int(request.form["Speryear"])
urlpart = request.form["api-url-input"]
url = "Classified" + API_VERSION + urlpart
print(url)
response = requests.get(url)
if response.status_code == 200:
jsonobjects = len(response.json())
task = Thread(group=None, target=None,name=Order66.main, args=(stringpart,url,sper_year,email), daemon=True)
task.start()
state = "succesvol."
message_body = Succes_message
else:
jsonobjects = 0;
state = "onsuccesvol."
message_body = Fail_message
return render_template(
"request-posted.html",
respstate = state,
body = message_body,
title=
"Aanvraag ingediend",
objects = jsonobjects,
year=datetime.now().year,
message='Uw aanvraag is ' + state
)
# TODO RUN THIS IN BACKGROUND
# app.route('/request-completed')
@app.route('/handle_data_fail')
def handle_data_fail():
jsonobjects = 0
state = "onsuccesvol."
message_body = Fail_message
return render_template(
"request-posted.html",
respstate = state,
body = message_body,
title=
"Aanvraag ingediend",
objects = jsonobjects,
year=datetime.now().year,
message='Uw aanvraag is ' + state
)
正如评论中所讨论的,这里有一个使用RabbitMQ和Flask的事件驱动系统的过于简单的例子。
您需要的依赖项:
(flask-rabbitmq) ➜ flask-rabbitmq pip freeze
click==8.0.3
Flask==2.0.2
itsdangerous==2.0.1
Jinja2==3.0.3
MarkupSafe==2.0.1
pika==1.2.0
Werkzeug==2.0.2
尝试使用以下命令创建RabbitMQ docker容器:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
你的简单烧瓶应用程序会是这样的:
from flask import Flask
from flask import request
import json
from RabbitMQPublisher import publish
app = Flask(__name__)
@app.route('/publish', methods=['POST'])
def index():
if request.method == 'POST':
publish(json.dumps(request.json))
return 'Done'
if __name__ == '__main__':
app.run()
你的RabbitMQPublisher.py
应该是这样的:
import pika
def publish(data: str):
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='0.0.0.0', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test', exchange_type='fanout')
channel.queue_declare(queue='', exclusive=True)
channel.basic_publish(exchange='test', routing_key='', body='{"data": %s}'%(data))
connection.close()
最后你的script.py
会是这样的:
import pika
import json
from time import sleep
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='0.0.0.0', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
channel.queue_bind(exchange='test', queue='')
def callback(ch, method, properties, body):
body = json.loads(body.decode().replace("'", '"'))
print(body)
channel.basic_consume(
queue='', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
在上面代码中的callback
函数中,您可以指定您是逻辑,当您的逻辑完成时,您可以再次使用RabbitMQ调用flask端的模式,或者使用requests
进行事件do http调用。那将是你的选择。