使用 Celery 使用参数初始化工作线程



我找不到对我来说似乎相对简单的东西时遇到了问题。

我正在将 Celery 3.1 与 Python 3 一起使用,并希望使用参数初始化我的工作线程,以便他们可以使用这些详细信息进行设置。

具体而言:这些工作线程将使用需要使用身份验证凭据与第三方 API 交互的任务。在使用任何任务之前,辅助角色必须将身份验证详细信息传递给 API 服务器(身份验证详细信息在第一次身份验证请求后存储在 Cookie 中)。

我想在从 CLI 启动工作线程时将这些登录凭据传递给工作线程。然后,我希望工作人员使用它们进行身份验证并存储会话以供将来使用任务时使用(理想情况下,这将存储在可以从任务访问的属性中)。

芹菜可以吗?

作为旁注,我已经考虑过将requests.session对象(来自 Python requests 库)作为任务参数传递,但这需要序列化,这看起来是不受欢迎的。

我建议使用抽象的任务基类并缓存requests.session

来自芹菜文档:

任务

不会为每个请求实例化,而是在任务注册表中注册为全局实例。

这意味着每个进程只会调用一次 __init__ 构造函数,并且任务类在语义上更接近 Actor。

这对于缓存资源也很有用...

import requests
from celery import Task
class APITask(Task):
    """API requests task class."""
    abstract = True
    # the cached requests.session object
    _session = None
    def __init__(self):
        # since this class is instantiated once, use this method
        # to initialize and cache resources like a requests.session
        # or use a property like the example below which will create
        # a requests.session only the first time it's accessed
    @property
    def session(self):
        if self._session is None:
            # store the session object for the first time
            session = requests.Session()
            session.auth = ('user', 'pass')
            self._session = session
        return self._session

现在,当您创建将发出 API 请求的任务时:

@app.task(base=APITask, bind=True)
def call_api(self, url):
    # self will refer to the task instance (because we're using bind=True)
    self.session.get(url)

此外,您还可以使用 app.task 装饰器传递 API 身份验证选项作为额外的参数,该参数将在任务__dict__设置,例如:

# pass a custom auth argument
@app.task(base=APITask, bind=True, auth=('user', 'pass'))
def call_api(self, url):
    pass

并使基类使用传递的身份验证选项:

class APITask(Task):
    """API requests task class."""
    abstract = True
    # the cached requests.session object
    _session = None
   # the API authentication
   auth = ()
    @property
    def session(self):
        if self._session is None:
            # store the session object for the first time
            session = requests.Session()
            # use the authentication that was passed to the task
            session.auth = self.auth
            self._session = session
        return self._session

您可以在 Celery 文档网站上阅读更多信息:

  • 任务实例化
  • 任务抽象类

现在回到您的原始问题,即从命令行将额外的参数传递给工人:

在 Celery 文档中有关于此的部分添加新的命令行选项,下面是从命令行向工作线程传递用户名和密码的示例:

$ celery worker -A appname --username user --password pass

代码:

from celery import bootsteps
from celery.bin import Option

app.user_options['worker'].add(
    Option('--username', dest='api_username', default=None, help='API username.')
)
app.user_options['worker'].add(
    Option('--password', dest='api_password', default=None, help='API password.')
)

class CustomArgs(bootsteps.Step):
    def __init__(self, worker, api_username, api_password, **options):
        # store the api authentication
        APITask.auth = (api_username, api_password)

app.steps['worker'].add(CustomArgs)

我认为您可以使用命令行参数调用您编写的脚本。如下所示:

my_script.py username password

在脚本中,您可以将 main 函数包装在 @celery.task@app.task装饰器中。

import sys
from celery import Celery
cel = Celery() # put whatever config info you need in here
@celery.task
def main():
    username, password = sys.argv[1], sys.argv[2]

这样的事情应该让你开始。请务必查看 Python 的 argparse 以获得更复杂的参数解析。

相关内容

  • 没有找到相关文章