基本上,我正在尝试在单独的线程中在服务器端生成事件。我有一个应该发出事件的celery.task
,但它的代码永远不会执行。
import json
import time
from celery import Celery
from flask import Flask
from flask import jsonify
from flask import render_template
from flask import request
from flask_socketio import SocketIO
broker_url = "redis://localhost:6379/1"
celery = Celery(broker=broker_url)
app = Flask(__name__)
socketio = SocketIO(app, message_queue=broker_url)
@celery.task
def countdown(n):
print("countdown", n)
for i in range(n+1):
time.sleep(1)
socketio.emit(
"countdown",
{"remaining": n - i},
namespace="/test/"
)
@app.route("/")
def index():
return render_template("index.html")
@app.route("/start_countdown/", methods=["POST"])
def start_countdown():
data = json.loads(request.data.decode())
countdown.delay([int(data["time"])])
return jsonify(time_to_wait=data["time"])
if __name__ == '__main__':
socketio.run(debug=True)
观点反应很好,但任务是沉默的,我无法理解,为什么?
UPD
我像这里一样重新排列了我的代码。文件夹结构完全相同,文件也相同。在app/main
文件夹中,我有额外的tasks.py
文件。
import time
from celery import Celery
from flask_socketio import emit
from app import socketio
from config import broker_url
celery = Celery(broker=broker_url)
@celery.task
def countdown(n):
print(n)
for i in range(n+1):
time.sleep(1)
print("Socket", socketio)
print("Server", socketio.server)
socketio.emit(
"countdown",
{"remaining": n - i},
namespace="/test/"
)
我用celery -A app.main.tasks worker
命令启动芹菜工人。当执行countdown
任务的代码时,它会失败,并出现以下异常:
[2017-10-12 19:04:07,797: WARNING/ForkPoolWorker-1] 13
[2017-10-12 19:04:08,799: WARNING/ForkPoolWorker-1] <flask_socketio.SocketIO object at 0x7f07d2a0fc50>
[2017-10-12 19:04:08,803: ERROR/ForkPoolWorker-1] Task app.main.tasks.countdown[68ae2e43-6ab7-4d52-8b3a-a9aaff46c489] raised unexpected: AttributeError("'NoneType' object has no attribute 'emit'",)
Traceback (most recent call last):
File "/path/to/venv/lib/python3.4/site-packages/celery/app/trace.py", line 374, in trace_task
R = retval = fun(*args, **kwargs)
File "/path/to/venv/lib/python3.4/site-packages/celery/app/trace.py", line 629, in __protected_call__
return self.run(*args, **kwargs)
File "/path/to/tasks.py", line 24, in countdown
namespace="/test/"
File "/path/to/venv/lib/python3.4/site-packages/flask_socketio/__init__.py", line 357, in emit
self.server.emit(event, *args, namespace=namespace, room=room,
AttributeError: 'NoneType' object has no attribute 'emit'
由于某种原因,我的任务中的socketio.server
None
,而app/main/events.py
文件中它是适当的对象。看起来在我的任务中socketio
对象没有完全初始化,可能是因为在芹菜过程中执行流程不同,但我不知道如何解决它。
您的芹菜任务是否在单独的 tasks.py 模块中? 例如:
from celery import Celery
celery = Celery('tasks', broker=broker_url)
socketio = SocketIO(app, message_queue=broker_url)
@celery.task
def countdown(n):
print("countdown", n)
for i in range(n+1):
time.sleep(1)
socketio.emit(
"countdown",
{"remaining": n - i},
namespace="/test/"
)
然后,您可以在提示符下启动此工作线程:
celery -A tasks worker --loglevel=info
然后,您应该运行主代码并添加:
from tasks import countdown
并调用将在单独进程中运行的倒计时函数
我只能让它与 Redis 作为代理一起工作,浪费了几个小时尝试使用 RabbitMQ 作为代理,并且无法从芹菜任务中发出消息。请确保使用 Flask SocketIO 构造函数创建一个新实例,而不是主 flask 应用的 socketio 实例。
我在使用 Flask-SocketIO 和 Celery 时遇到了类似的问题。
正如上面提到的其他人,我们无法从 Celery 任务与客户端通信,因为它们是两个不同的过程。
我的解决方案是在 Flask 服务器上创建一个端点,该端点基本上将事件广播到客户端。此终结点将采用如下所示的 json 数据:
{
"event_name": "Event name",
"room": "roomId", // optional field
"data": "<data you want to send>" // this is the data you want to send to the users
"broadcast_secret": <secret> // This would ensure ensure that we the endpoint is not public to everyone. If this secret is invalid then we would not broadcast anything to the client.
}
以下是终结点的外观:
@app.route('/braodcast',methods = ['POST'])
def broadcast_to_client(self):
json_data = self.get_json()
event_name = json_data.get('event_name')
room = json_data.get('room')
data = json_data.get('data')
broadcast_secret = json_data.get('broadcast_secret')
status_code, emitted = 400, False
if event_name and broadcast_secret == os.getenv('BROADCAST_SECRET'):
socketio.emit(event_name, data, room=room)
status_code, emitted = 200, True
return {'emitted': emitted}, status_code
然后,我创建了一个帮助程序函数,该函数使用请求模块从我的芹菜任务向此端点发出后请求:
import requests
def send_socket_event_from_celery(event_name, data=None, room=None):
app_host = os.getenv('APP_HOST')
broadcast_path = f'{app_host}/broadcast'
json_data = {
'event_name': event_name,
'data': data,
'room': room,
'broadcast_secret': os.getenv('BROADCAST_SECRET')
}
requests.post(broadcast_path, json=json_data)
现在在任何芹菜任务中,我需要在套接字 IO 中发出一个事件,我只需调用此方法即可。
send_socket_event_from_celery('sample-event', data, room=some_room)