芹菜任务的动态注册



我想知道在运行时注册的任务中使用芹菜的最佳方式是什么。我的工作流程如下:

  • 启动芹菜应用程序
  • 启动python应用程序
  • python应用程序创建了一个新任务,我想用芹菜来安排它

我完成的方式是基于"插件"概念,该概念与带有自定义子命令的点击包具有相同的思想。

应用程序结构(基于python 3):

.
├── dynamic_tasks.py
├── run.py
└── tasks
└── get_rate.py

芹菜任务dynamic_tasks.py定义如下:

import os
import celery
app = celery.Celery('dynamic_tasks', broker='amqp://guest@192.168.169.1/', backend='rpc://')
PLUGIN_FOLDER = os.path.join(os.path.dirname(__file__), 'tasks')
def _absolutepath(filename):
""" Return the absolute path to the filename"""
return os.path.join(PLUGIN_FOLDER, filename)
@app.task
def tasks(funcname, *args, **kwargs):
try:
funcname = funcname.replace('-', '_')
funcname += '.py'
func = _absolutepath(funcname)
ns = {}
with open(func) as f:
code = compile(f.read(), func, 'exec')
eval(code, ns, ns)
return ns['task'](*args, **kwargs)
except IOError as e:
# Manage IOError
raise e

可插入任务示例tasks/get_rate.py:

""" This task get the currency rate between a pair of currencies """    
import urllib.request
URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'
def task(pair='EURSEK', url_tmplt=URL):
with urllib.request.urlopen(url_tmplt.format(pair)) as res:
body = res.read()
return (pair, float(body.strip()))

简单地说,运行run.py中的示例

from dynamic_tasks import tasks
print(tasks.delay('get_rate', 'EURSEK').get())

已编辑由于芹菜运行在不同的机器上,因此不可能依赖本地文件系统。我的新方法是发送函数以字符串形式执行:

@app.task
def dynamic_tasks(funcname, funccode, *args, **kwargs):
try:
ns = {}
code = compile(funccode, funcname, 'exec')
eval(code, ns, ns)
logger.info('execute %r with args %r, %r', funcname, args, kwargs)
return ns['task'](*args, **kwargs)
except IOError:
logger.error("Error loading the dynamic function from text %s", funcname)

相关内容

  • 没有找到相关文章

最新更新