我目前正在研究在视频处理后端使用Celery。本质上,我的问题如下:
- 我有一个前端web服务器,它可以同时处理大量视频流(大约数千个)
- 每个流必须独立处理,并并行处理
- 流处理可以分为两种类型的操作:
- 逐帧操作(不需要有关前一帧或后一帧的信息的计算)
- 流级操作(对有序的相邻帧的子集进行计算)
给定第3点,我需要在整个过程中维护和更新一个有序的框架结构,并将该结构的子部分上的农场计算提供给Celery工人。最初,我想把事情组织如下:
[frontend server] -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]
其思想是celery worker 1
执行长时间运行的任务,这些任务主要是I/O绑定的。本质上,这些任务将仅执行以下操作:
- 从前端服务器读取帧
- 从帧的base64表示解码帧
- 在前面提到的有序数据结构(
collections.deque
对象,如当前所示)中排队
任何CPU绑定的操作(,即图像分析)都会发送到celery worker 2
。
我的问题如下:
我想把协同程序作为一个任务来执行,这样我就有了一个长时间运行的任务,我可以从中yield
,这样就不会阻止celery worker 1
的操作。换句话说,我希望能够做一些类似于的事情
def coroutine(func):
@wraps(func)
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next()
return cr
return start
@coroutine
def my_taks():
stream = deque() # collections.deque
source = MyAsynchronousInputThingy() # something i'll make myself, probably using select
while source.open:
if source.has_data:
stream.append(Frame(source.readline())) # read data, build frame and enqueue to persistent structure
yield # cooperatively interrupt so that other tasks can execute
是否有一种方法可以使基于协同程序的任务无限期运行,理想情况下生成yield
ed的结果
Eventlet背后的主要思想是,您想要编写同步代码,就像线程一样,socket.recv()
应该阻塞当前线程,直到下一条语句。这种样式在调试时非常易于阅读、维护和推理。为了使事情在幕后变得有效和可扩展,Eventlet神奇地用绿色线程和epoll/kqueue/etc机制取代了看似阻塞的代码,以便在适当的时候唤醒这些绿色线程。
因此,您只需要尽快执行eventlet.monkey_patch()
(例如模块中的第二行),并确保在MyInputThingy
中使用纯Python套接字操作。忘记异步,只需像编写线程一样编写普通的阻塞代码。
Eventlet使同步代码再次变得良好。