Django Celerybeat PeriodicTask运行远远超出预期



我在Django,芹菜,dj芹菜&PeriodicTasks .

我创建了一个任务来为Adsense生成一个实时统计报告。下面是我的任务:

import datetime
import httplib2
import logging
from apiclient.discovery import build
from celery.task import PeriodicTask
from django.contrib.auth.models import User
from oauth2client.django_orm import Storage
from .models import Credential, Revenue

logger = logging.getLogger(__name__)

class GetReportTask(PeriodicTask):
    run_every = datetime.timedelta(minutes=2)
    def run(self, *args, **kwargs):
        scraper = Scraper()
        scraper.get_report()

class Scraper(object):
    TODAY = datetime.date.today()
    YESTERDAY = TODAY - datetime.timedelta(days=1)
    def get_report(self, start_date=YESTERDAY, end_date=TODAY):
        logger.info('Scraping Adsense report from {0} to {1}.'.format(
            start_date, end_date))
        user = User.objects.get(pk=1)
        storage = Storage(Credential, 'id', user, 'credential')
        credential = storage.get()
        if not credential is None and credential.invalid is False:
            http = httplib2.Http()
            http = credential.authorize(http)
            service = build('adsense', 'v1.2', http=http)
            reports = service.reports()
            report = reports.generate(
                startDate=start_date.strftime('%Y-%m-%d'),
                endDate=end_date.strftime('%Y-%m-%d'),
                dimension='DATE',
                metric='EARNINGS',
            )
            data = report.execute()
            for row in data['rows']:
                date = row[0]
                revenue = row[1]
                try:
                    record = Revenue.objects.get(date=date)
                except Revenue.DoesNotExist:
                    record = Revenue()
                record.date = date
                record.revenue = revenue
                record.save()
        else:
            logger.error('Invalid Adsense Credentials')

我用芹菜&RabbitMQ。以下是我的设置:

# Celery/RabbitMQ
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "myuser"
BROKER_PASSWORD = "****"
BROKER_VHOST = "myvhost"
CELERYD_CONCURRENCY = 1
CELERYD_NODES = "w1"
CELERY_RESULT_BACKEND = "amqp"
CELERY_TIMEZONE = 'America/Denver'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
import djcelery
djcelery.setup_loader()

乍一看一切似乎都工作,但在打开记录器并观察它运行后,我发现它连续至少运行了四次任务-有时更多。它似乎是每分钟运行一次,而不是每两分钟运行一次。我已经尝试改变run_every使用crontab,但我得到相同的结果。

我开始使用监控器celerybeat。下面是我使用的命令:

python manage.py celeryd -B -E -c 1

你知道为什么它不像预期的那样工作吗?

哦,还有一件事,在日期改变后,它继续使用它第一次运行的日期范围。因此,随着时间的推移,它会继续获取任务开始运行当天的统计数据——除非我在某个时刻手动运行任务,否则它会更改为我最后一次手动运行任务的日期。有人能告诉我为什么会这样吗?

考虑为这类任务创建一个具有一个工作进程和固定速率的单独队列,并将任务添加到这个新队列中,而不是直接从celerybeat运行它们。我希望这能帮助你找出你的代码出了什么问题,是celerybeat的问题还是你的任务运行时间比预期的长。

@task(queue='create_report', rate_limit='0.5/m')
def create_report():
    scraper = Scraper()
    scraper.get_report()
class GetReportTask(PeriodicTask):
    run_every = datetime.timedelta(minutes=2)
    def run(self, *args, **kwargs):
        create_report.delay()
在settings.py

   CELERY_ROUTES = {
     'myapp.tasks.create_report': {'queue': 'create_report'},
   }

启动额外的芹菜worker,用于处理队列中的任务

celery worker -c 1 -Q create_report -n create_report.local

问题2。昨天和今天变量是在类级别设置的,所以在一个线程内它们只设置一次。

相关内容

  • 没有找到相关文章

最新更新