我有两个烧瓶应用程序,一个在服务器 A 上,另一个在服务器 B 上。我想做的是在某些条件下从服务器 A 上的应用程序生成一个异步任务,并将其发送到服务器 B 上的应用程序(即调用服务器 B 上的函数(。我认为芹菜发送任务方法将用于它,但不知道如何使用它。
假设我在服务器 B 上的应用程序中有一个函数"func">
def func(x):
return x
我想在服务器 A 上的应用程序中的另一个函数"somefunc"中调用"func",如下所示:
def somefunc(x):
if condition is True:
func(x)
我将如何使用芹菜来实现此逻辑?请提前帮助和感谢
在服务 A 上,您将拥有以下内容:
from celery.execute import send_task
@app.route('/')
def endpoint():
if cond(x):
send_task(
'task_service_b',
(param1, param2),
exchange='if u have a specific one',
routing_key='a routing key'
)
在服务 b 上,您需要让应用程序侦听"路由密钥"并绑定到交换"如果您有特定的路由密钥",
messaging_exchange = Exchange('if u have a specific one')
bindings = (
binding(messaging_exchange, routing_key=routing_key)
for routing_key in ['a routing key']
)
default_binding = binding(
Exchange(celery_app.conf.task_default_queue),
routing_key=celery_app.conf.task_default_queue
)
celery_app.conf.task_queues = [
# default queue has same routing key as name of the queue
Queue(celery_app.conf.task_default_queue, [default_binding]),
Queue('service.b.queue', list(bindings))
]
否则,您可以绕过所有队列,只send_task
服务 B 队列。 您将需要服务 B 上的芹菜工作线程,因为该任务需要由该辅助角色使用
我从您的措辞中假设您在服务器 A 和 B 上运行不同的应用程序。如果它们是相同的应用程序,使用相同的芹菜代理和后端,那么命名队列,其中一个队列由仅在 B 上运行的芹菜工人提供服务,可以为您提供所需的效果。
如果 A 和 B 运行不同的代码,则安全的方法是让 A 上的异步任务向 B 上的终结点发出 HTTP 请求,该终结点调用该函数并在 HTTP 响应中发送回答案,以便 A 中的异步任务进行处理。
阐述:
运行缓慢的异步任务(例如,在 tasks.py 中(
@celery.task
def slow_running_task():
...
配置为在特定队列中运行
CELERY_ROUTES = {
'tasks.slow_running_task': {'queue': 'slow'},
...
可以通过仅在该服务器上运行具有-Q slow
的芹菜工作器来在特定服务器上运行。
有细微差别。值得浏览一下芹菜文档。