Django芹菜工人将实时状态和结果消息发送到前端



在我正在运行异步任务的Django应用中,并希望向用户显示进度,错误等。如果存在错误,则应将用户重定向到一个页面,在该页面上,需要其他输入或某些操作来解决问题。从芹菜工作回到前端的最佳沟通方法是什么?

这是伪代码中的基本结构:

# views.py
from tasks import run_task
def view_task():
    run_task.delay()
    return render(request, 'template.html')
# tasks.py
from compute_module import compute_fct
@shared_task
def run_task():
    result = compute_fct()
    # how to catch status update messages from compute_module while compute_fct is running??
    if result == 'error':
        handle_error()
    else:
        handle_succes()     
# compute_module
import pandas as pd
def compute_fct():
    # send message: status = loading file
    df = pd.read_csv('test.csv')
    # send message: status = computing
    val = df['col'].mean()
    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}

我理想地想要什么:

  • compute_module.py模块使用Python本机记录器。通过将职责分开,我想保持日志记录尽可能通用,并使用标准的Python/Django记录器。但是它们似乎并不是为了将消息发送到前端。
  • 芹菜任务以某种方式处理日志,而不是在stdout上显示它们将它们重定向到推动器
  • 前端JS显示和处理消息

我不知道芹菜工人和前端之间可能存在标准的沟通方式。这种情况必须经常发生,我很惊讶它很难实施。在某种程度上,应为此设计RabbitMQ消息队列或AWS SNS。以下是我看过的资源,但感觉不太好,但也许我只是感到困惑。

记录:这似乎更多地是关于在服务器端登录,而不是向用户发送消息

  • http://docs.celeryproject.org/en/latest/userguide/tasks.html#logging
  • https://docs.djangoproject.com/en/2.0/topics/logging/
  • http://oddbird.net/2017/04/17/async-notifications/
  • https://www.google.com/search?q=celery worker send message message to front end

芹菜凸轮似乎是关于管理监视任务,而不是向用户发送消息

  • http://docs.celeryproject.org/en/latest/userguide/monitoring.html

我喜欢推动器,但我不想让compute_module.py处理它。例如,我宁愿不在compute_module.py中进行任何pusher.com集成。猜猜我可以通过已经实例化的推动器对象,以便模块可以按消息,但是我又希望它是通用的

  • https://blog.pusher.com/improve-user-experience-app-real time-progress-bar-tutorial/
  • https://blog.pusher.com/django-pusherth/

编辑:现在移至django-channels,效果很好,但比下面的解决方案更复杂。

上一个:

好的,下面是我现在如何解决它的伪代码。基本上,我使用https://pusher.com/docs/javascript_quick_start,服务器端将实例化对象传递到compute_module中。一个缺点是推动器的消息是字体,所以我必须在LogPusher中做一些额外的工作才能将它们存储在DB中,这是另一天...

在我的实际实施中,我还通过$.post() ajax调用$(document).ready()触发任务,因为小任务完成了如此之快,因为用户永远不会看到推动器消息,因为连接没有建立(返回到历史悠久的消息问题)。/p>

我上面没有提到的另一条替代路线是https://channels.readthedocs.io/en/latest/

[编辑]另一个解决方案是具有Django实现的服务器序列事件,未对其进行测试。但是它看起来很适合单向更新,例如从服务器到客户端(VS Websockets双向)。您将需要像Redis PubSub这样的消息传递系统来获取服务器SSE路由的更新。

通过推动器从Django服务器进行的前端更新:

# views.py
from tasks import run_task
def view_task():
    run_task.delay('event')
    return render(request, 'template.html', 'pusher_event':'event')
    
# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct
class LogPusher(object):
    def __init__(self, event):
        self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
                        key=settings.PUSHER_KEY,
                        secret=settings.PUSHER_SECRET,
                        cluster=settings.PUSHER_CLUSTER, ssl=True)
        self.event = event
        
    def send(self, data):
        self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))
@shared_task
def run_task(pusher_event):
    
    log_pusher = LogPusher(pusher_event)
    result = compute_fct(log_pusher)
    # how to catch status update messages from compute_module while compute_fct is running??
    if result == 'error':
            log_pusher.send('status':'error')
    else:
            log_pusher.send('status':'success')
            
# compute_module.py
import pandas as pd
def compute_fct(log_pusher):
    # send message: status = loading file
    log_pusher.send('status':'loading file')
    df = pd.read_csv('test.csv')
    # send message: status = computing
    log_pusher.send('status':'computing')
    val = df['col'].mean()
    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}
        
# context_processors.py
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django
from django.conf import settings 
def pusher(request):
    return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }
        
# template.html
<script>
    
var pusher = new Pusher("{{PUSHER_KEY}}", {
  cluster: "{{PUSHER_CLUSTER}}",
  encrypted: true    
});
var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
    // process data
});
</script>

我设法获得实时状态的唯一方法就是将一些SQL Writes/api调用放入任务本身。使用任务的返回值进行操作要容易得多,因为您只能编写自定义任务类。

我不完全确定使用Django的工作方式,但应该看起来像这样。

class CustomTask(celery.Task):
    def __call__(self, *args, **kwargs):
        self.start_time = time.time()
    def on_success(self, retval, task_id, args, kwargs):
        do_success_stuff()
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        do_failure_stuff()
@shared_task(base=CustomTask)
def do_stuff():
    return create_widgets()

完整列表可以在此处找到:http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers

有一个称为芹菜制作的库,可能会有所帮助芹菜库

他还撰写了一篇有关手动执行此操作的博客文章:有关芹菜进度条的博客

最新更新