我正在尝试让一个函数在我的 django 项目中使用 celerybeat 从包装器库中导入基于类的函数。 我一直在读到芹菜不太容易上课。我的函数login_mb不接受参数,但是当我尝试注册并调用此任务时,我收到错误Couldn't apply scheduled task login_mb: login_mb() takes 0 positional arguments but 1 was given
这是因为导入的包装器函数中的 self 吗?
我能做些什么来让它与芹菜节拍一起工作?
settings.py
CELERY_BEAT_SCHEDULE = {
'login_mb': {
'task': 'backend.tasks.login_mb',
'schedule': timedelta(minutes=30),
} ,
tasks.py
from matchbook.apiclient import APIClient
import logging
from celery import task
log = logging.getLogger(__name__)
@shared_task(bind=True)
def login_mb():
mb = APIClient('abc', '123')
mb.login()
mb.keep_alive()
apiclient.py(包装库(
from matchbook.baseclient import BaseClient
from matchbook import endpoints
class APIClient(BaseClient):
def __init__(self, username, password=None):
super(APIClient, self).__init__(username, password)
self.login = endpoints.Login(self)
self.keep_alive = endpoints.KeepAlive(self)
self.logout = endpoints.Logout(self)
self.betting = endpoints.Betting(self)
self.account = endpoints.Account(self)
self.market_data = endpoints.MarketData(self)
self.reference_data = endpoints.ReferenceData(self)
self.reporting = endpoints.Reporting(self)
def __repr__(self):
return '<APIClient [%s]>' % self.username
def __str__(self):
return 'APIClient'
该错误与您的包装库无关,您的任务似乎没有任何问题。
出现此问题是因为您已使用bind=True定义了任务 这样做时,芹菜自动机会将一个参数注入到包含当前任务相关信息的方法中。因此,您可以删除bind=True,也可以像这样向任务方法添加一个参数:
@shared_task(bind=True)
def login_mb(self):
mb = APIClient('abc', '123')
mb.login()
mb.keep_alive()