如何在烧瓶应用程序中使用芹菜将任务从一台服务器发送到另一台服务器



我有两个烧瓶应用程序,一个在服务器 A 上,另一个在服务器 B 上。我想做的是在某些条件下从服务器 A 上的应用程序生成一个异步任务,并将其发送到服务器 B 上的应用程序(即调用服务器 B 上的函数(。我认为芹菜发送任务方法将用于它,但不知道如何使用它。

假设我在服务器 B 上的应用程序中有一个函数"func">

def func(x):
return x

我想在服务器 A 上的应用程序中的另一个函数"somefunc"中调用"func",如下所示:

def somefunc(x):
if condition is True:
func(x)   

我将如何使用芹菜来实现此逻辑?请提前帮助和感谢

在服务 A 上,您将拥有以下内容:

from celery.execute import send_task

@app.route('/')
def endpoint():
if cond(x):
send_task(
'task_service_b',
(param1, param2),
exchange='if u have a specific one',
routing_key='a routing key'
)

在服务 b 上,您需要让应用程序侦听"路由密钥"并绑定到交换"如果您有特定的路由密钥",

messaging_exchange = Exchange('if u have a specific one')
bindings = (
binding(messaging_exchange, routing_key=routing_key)
for routing_key in ['a routing key']
)
default_binding = binding(
Exchange(celery_app.conf.task_default_queue),
routing_key=celery_app.conf.task_default_queue
)
celery_app.conf.task_queues = [
# default queue has same routing key as name of the queue
Queue(celery_app.conf.task_default_queue, [default_binding]),
Queue('service.b.queue', list(bindings))
]

否则,您可以绕过所有队列,只send_task服务 B 队列。 您将需要服务 B 上的芹菜工作线程,因为该任务需要由该辅助角色使用

我从您的措辞中假设您在服务器 A 和 B 上运行不同的应用程序。如果它们是相同的应用程序,使用相同的芹菜代理和后端,那么命名队列,其中一个队列由仅在 B 上运行的芹菜工人提供服务,可以为您提供所需的效果。

如果 A 和 B 运行不同的代码,则安全的方法是让 A 上的异步任务向 B 上的终结点发出 HTTP 请求,该终结点调用该函数并在 HTTP 响应中发送回答案,以便 A 中的异步任务进行处理。

阐述:

运行缓慢的异步任务(例如,在 tasks.py 中(

@celery.task
def slow_running_task():
...

配置为在特定队列中运行

CELERY_ROUTES = {
'tasks.slow_running_task': {'queue': 'slow'},
...

可以通过仅在该服务器上运行具有-Q slow的芹菜工作器来在特定服务器上运行。

有细微差别。值得浏览一下芹菜文档。

相关内容

  • 没有找到相关文章

最新更新