芹菜可以作为有状态/可恢复的任务协同运行协同程序吗



我目前正在研究在视频处理后端使用Celery。本质上,我的问题如下:

  1. 我有一个前端web服务器,它可以同时处理大量视频流(大约数千个)
  2. 每个流必须独立处理,并并行处理
  3. 流处理可以分为两种类型的操作:
    1. 逐帧操作(不需要有关前一帧或后一帧的信息的计算)
    2. 流级操作(对有序的相邻帧的子集进行计算)

给定第3点,我需要在整个过程中维护和更新一个有序的框架结构,并将该结构的子部分上的农场计算提供给Celery工人。最初,我想把事情组织如下:

[frontend server]  -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]

其思想是celery worker 1执行长时间运行的任务,这些任务主要是I/O绑定的。本质上,这些任务将执行以下操作:

  1. 从前端服务器读取帧
  2. 从帧的base64表示解码帧
  3. 在前面提到的有序数据结构(collections.deque对象,如当前所示)中排队

任何CPU绑定的操作(,即图像分析)都会发送到celery worker 2

我的问题如下:

我想把协同程序作为一个任务来执行,这样我就有了一个长时间运行的任务,我可以从中yield,这样就不会阻止celery worker 1的操作。换句话说,我希望能够做一些类似于的事情

def coroutine(func):
    @wraps(func)
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        cr.next()
        return cr
    return start
@coroutine
def my_taks():
    stream = deque()  # collections.deque
    source = MyAsynchronousInputThingy()  # something i'll make myself, probably using select
    while source.open:
        if source.has_data:
            stream.append(Frame(source.readline()))  # read data, build frame and enqueue to persistent structure
        yield  # cooperatively interrupt so that other tasks can execute

是否有一种方法可以使基于协同程序的任务无限期运行,理想情况下生成yield ed的结果

Eventlet背后的主要思想是,您想要编写同步代码,就像线程一样,socket.recv()应该阻塞当前线程,直到下一条语句。这种样式在调试时非常易于阅读、维护和推理。为了使事情在幕后变得有效和可扩展,Eventlet神奇地用绿色线程和epoll/kqueue/etc机制取代了看似阻塞的代码,以便在适当的时候唤醒这些绿色线程。

因此,您只需要尽快执行eventlet.monkey_patch()(例如模块中的第二行),并确保在MyInputThingy中使用纯Python套接字操作。忘记异步,只需像编写线程一样编写普通的阻塞代码。

Eventlet使同步代码再次变得良好。

相关内容

  • 没有找到相关文章

最新更新