最近,我使用Python 3.x在Ubuntu中使用芹菜和Flower(用于仪表板和任务可视化)。首先,我安装了兔子服务器,凉爽的芹菜和花朵。然后,我创建了一个称为tasks.py
的脚本,其中包含以下内容:
from celery import Celery
# py-advanced-message-queuing-protocol
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://localhost//')
@app.task
def intensive_sum1(num):
val = sum(x**4 for x in range(num))
return val
@app.task
def intensive_sum2(num):
val = sum(x**4 for x in range(num))
return val
@app.task
def intensive_sum3(num):
val = sum(x**4 for x in range(num))
return val
然后我创建了一个包含
的脚本run.py
from tasks import intensive_sum1, intensive_sum2, intensive_sum3
import time
start = time.time()
result1 = intensive_sum1.delay(100000000)
result2 = intensive_sum2.delay(100000000)
result3 = intensive_sum3.delay(100000000)
print(result1.get(), result2.get(), result3.get())
end = time.time()
print('time: ', end - start)
start = time.time()
result1 = sum(x**4 for x in range(100000000))
result2 = sum(x**4 for x in range(100000000))
result3 = sum(x**4 for x in range(100000000))
print(result1, result2, result3)
end = time.time()
print('time: ', end - start)
在运行后者之前,我开始了两个不同的终端,并将目录更改为两个脚本的位置。然后,我在一个终端中运行sudo celery -A tasks flower
,在另一个端子中运行celery -A tasks worker --loglevel=info
。事实证明,(惊喜)芹菜可以将每个任务分配给单个核心,从而节省大量时间。当然,这次节省大量功能只能预期,因为较小的功能会产生线程的开销,这不会带来任何好处。
这使我考虑了另一个问题。假设我没有一台机器,我有3台连接到同一WiFi路由器的机器。我可以使用ifconfig
命令来为这些Ubuntu机器中的每台计算机进行IP地址。让我们说,其中一台机器是一台包含main.py
脚本的主计算机,该脚本使用OpenCV-Python捕获对象捕获实时图像。然后,它将每个图像序列化并将其作为消息发送给两个工具机器。这两台工具机器都可以独立工作,并且都可以通过相同的图像来解除序列化。一台工具机器进行猫分类并返回猫的概率,另一台机器进行狗的分类并返回狗的概率。一台工具可能比另一个工具得出的结论需要更长的时间。但是,对于该特定帧,主机需要等待两个分类结果,然后再将某些结果叠加在该特定帧上。本能地,我被认为是主机需要在前进之前检查两个作业是否准备就绪(e.g. result_worker_one.ready() == result_worker_two.ready() == True
)。我该如何实现这种行为?我如何在主机中序列化一个RGB图像并在工具机器中将其序列化?backend
和broker
需要什么机器?如何将其设置为客户端服务器体系结构?
您对在多个计算机上分配作业是正确的。实际上,这是芹菜的主要目的之一。
-
要检查是否完成了两个异步作业,您可以在芹菜中使用组和和弦选项。假设您的两个芹菜任务如下:
@app.task def check_dog(): #dog_classification code @app.task def check_cat(): #cat classification code
您可以将这些任务分组在一起,然后使用和弦(和弦是一个仅在组中所有任务完成执行后才执行的任务)才能在两个功能执行后转到下一步。在下面显示的回调函数中包含您需要的任何内容。相关文档可以在此处找到:http://docs.celeryproject.org/en/master/userguide/canvas.html#groups
chord([check_dog(),check_cat()])(callback)
-
以序列化部分来看一下:将图像传递到芹菜任务
-
要回答问题的第三部分,芹菜固有地遵循客户端服务器体系结构以支持并行计算。每当您调用芹菜任务时,它将在您设置的消息代理上放置消息(在您的情况下使用了RabbitMQ)。此消息将包含有关要运行的任务以及所有必需的参数的信息。梅塞奇队列将向跨不同计算机的芹菜工人传递消息。工人收到消息后,工人将执行消息描述的任务。因此,如果您想在多个计算机中分配任务,您要做的就是在每台计算机中启动一个芹菜工人,该工具在主计算机中倾听您的消息队列。您可以按以下方式配置工人
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://<username>:<password>@<ip of task queue host>')
确保您向每个芹菜工人提供任务文件,因为传递给工人的消息不包含源代码,而仅包含任务名称本身。