Django 频道从 Celery 任务发送群组消息.异步事件循环在所有异步任务完成之前停止



我目前陷入了一个特别棘手的问题,我会尽力解释它。

我有一个 Django 项目,它的主要目的是从数据库快速执行排队的任务。我使用 Celery 和 Celerybeat 通过 Django 频道来实现这一点,以实时更新我的模板和响应。

芹菜工作线程是一个具有相当数量的线程的 gevent worker 池。

我的任务(简体版(:

@shared_task
def exec_task(action_id):
# execute the action
action = Action.objects.get(pk=action_id)
response = post_request(action)
# update action status
if response.status_code == 200:
action.status = 'completed'
else:
action.status = 'failed'
# save the action to the DB
action.save()
channel_layer = get_channel_layer()
status_data = {'id': action.id, 'status': action.status}
status_data = json.dumps(status_data)
try:
async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})
except:
event_loop = asyncio.get_running_loop()
future = asyncio.run_coroutine_threadsafe(channel_layer.group_send('channel_group', {'type': 'propergate_status', 'data': status_data}), event_loop)
result = future.result()

我的错误:

[2019-10-03 18:47:59,990:警告/主进程] 排队操作:25

[2019-10-03 18:48:02,206:警告/主进程] C:\users\jack\documents\GitHub\mcr-admin\venv\lib\site-packages\gevent_socket3.py:123: 运行时警告:从未等待
协程"AsyncToSync.main_wrap"self._read_event = io_class(fileno, 1(

运行时警告:启用 tracemalloc 以获取对象分配回溯

[2019-10-03 18:48:02,212:警告/主进程] c:\用户\杰克\文档\GitHub\mcr-admin\venv\lib\site-packages\gevent_socket3.py:123: 运行时警告:协程"BaseEventLoop.shutdown_asyncgens"从未出现 等待 self._read_event = io_class(fileno, 1( 运行时警告:

最初在我将操作保存到数据库后,我刚刚调用:

async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})

但是我一直收到运行时错误,因为如果已经有一个asyncio事件循环已经在运行,则无法使用async_to_sync,如第 61 行所示。所以我有多个gevent线程试图async_to_sync非常接近,不断在链接中抛出错误。

这让我得到了这个精彩的答案和当前版本的exec_task,它在向 Django 频道组发送消息时有 98% 的成功率,但我真的需要它是 100%。

这里的问题是,在我添加的协程有机会完成之前,asyncio 事件循环偶尔会停止,我一直在调整我的代码,使用 asyncio 和事件循环 api,但我要么破坏我的代码,要么得到更糟糕的结果。我有一种感觉,这可能与 Asgirefasync_to_sync函数提前关闭循环有关,但它很复杂,我几天前才开始使用 python 异步。

欢迎任何反馈、评论、提示或修复!

干杯。

最后我无法解决问题,而是选择使用通道AsyncHttpConsumer发送组消息的替代解决方案。这不是最佳的,但它可以工作并将工作流程保留在频道库中。

消费者:

class celeryMessageConsumer(AsyncHttpConsumer):
async def handle(self, body):
# send response
await self.send_response(200, b"Recieved Loud and Clear", headers=[
(b"Content-Type", b"text/plain"),
])
# formating url encoded string into json
body_data = urllib.parse.unquote_plus(body.decode("utf-8"))
body_data = json.loads(body_data)
id = body_data['data']['id']
await self.channel_layer.group_send(
f"group_{id}",
{
'type': 'propergate.data',
'data': body_data['data']
}
)

路由:

application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack(
URLRouter(
myApp.routing.websocket_urlpatterns
)
),
'http': URLRouter([
path("celeryToTemplate/", consumers.celeryMessageConsumer),
re_path('genericMyAppPath/.*', AsgiHandler),
]),
})

Http 请求:

data = json.dumps({'id': id, 'status': status})
response = internal_post_request('http://genericAddress/celeryToTemplate/', data)
if response.status_code == 200:
# phew
pass
else:
# whoops
pass

请求:

def internal_post_request(request_url, payload):
headers={
'Content-Type': 'application/json'
}
response = requests.post(request_url, data=payload, headers=headers)
return response

嗨,我目前遇到了您的确切问题,能够将已完成的芹菜任务的消息发送到客户端至关重要。

我之前能够通过使用模型方法的信号来group_send,例如:

def SyncLogger(**kwargs):
""" a syncronous function to instigate the websocket layer
to send messages to all clients in the project """
instance = kwargs.get('instance')
# print('instance {}'.format(instance))
args = eval(instance.args)
channel_layer = channels.layers.get_channel_layer()
async_to_sync(channel_layer.group_send)(
args ['room'],
{
"type": "chat.message",
"operation": args['operation'],
"state": instance.state,
"task": instance.task
})

和信号

post_save.connect(SyncLogger, TaskProgress)

更新只要有event_loop,我就可以发送消息 无论使用者是否异步,这都有效

@shared_task()
def test_message():
channel_layer = get_channel_layer()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(channel_layer.group_send('sync_chat', {
'type': 'chat.message',
'operation': 'operation',
'state': 'state',
'task': 'task'
}))

相关内容

  • 没有找到相关文章

最新更新