用 Celery 调度 Django 方法



我有这个方法:

def getExchangeRates(): 
    """ Here we have the function that will retrieve the latest rates from fixer.io """
    rates = {}
    response = urlopen('http://data.fixer.io/api/latest?access_key=c2f5070ad78b0748111281f6475c0bdd')
    data = response.read()
    rdata = json.loads(data.decode(), parse_float=float) 
    rates_from_rdata = rdata.get('rates', {})
    for rate_symbol in ['USD', 'GBP', 'HKD', 'AUD', 'JPY', 'SEK', 'NOK']:
        try:
            rates[rate_symbol] = rates_from_rdata[rate_symbol]
        except KeyError:
            logging.warning('rate for {} not found in rdata'.format(rate_symbol)) 
            pass
    return rates
@require_http_methods(['GET', 'POST'])
def index(request):
    rates = getExchangeRates()
    fixerio_rates = [Fixerio_rates(currency=currency, rate=rate)
                 for currency, rate in rates.items()]
    Fixerio_rates.objects.bulk_create(fixerio_rates)
    return render(request, 'index.html') 

我想安排这个,比方说,每天上午9点,除了周末。

还没有找到一些关于如何根据这样一个特定的日期时间来安排它的综合教程,而且,我不知道我是否可以安排这种方法,或者在我的tasks文件中创建另一个继承此方法的方法,并在任何特定日期运行。

我的项目根目录中有celery.py文件,我的应用程序文件夹中有tasks.py文件。

或者,也许芹菜不是这种情况的出路?

有什么想法吗?

有一些 django 软件包可以让你使用 django 管理界面来管理"类 cron"作业。我过去同时使用django-chronograph和django-chroniker (https://github.com/chrisspen/django-chroniker)。还有django-cron(https://django-cron.readthedocs.io/en/latest/installation.html),但我从未使用过它。

它们都有类似的方法:您在 crontab runninng 上创建一个名为 python manage.py runcrons 的条目,然后在您的settings.py上添加包以在 admin 上显示它。

查看 Chroniker 或 Django-cron 的文档,以获取有关如何设置它的更多信息。

此外,您可以使用 Celery Beat 来安排您需要的任务。

任务可以用芹菜节拍来安排。

芹菜节拍必须作为另一个过程启动。此beat进程会将计划任务踢到芹菜worker进程,该进程将像任何其他芹菜异步任务一样启动任务。要执行这两个过程通常是一个好主意,例如在生产中使用主管和在开发中使用 honcho。

计划任务可以在代码中定义,也可以存储在数据库中,并通过扩展名为django-celery-beat的django-admin进行处理

若要通过代码添加它,最简单的方法是在tasks.py文件中创建另一个方法。对于您每天上午9点的要求,除了周末"它可能看起来像这样"

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Executes every Monday morning at 9 a.m.
    sender.add_periodic_task(
        crontab(hour=9, minute=0, day_of_week=1),
        test.s('Happy Mondays!'),
    )
    sender.add_periodic_task(
        crontab(hour=9, minute=0, day_of_week=2),
        test.s('Happy Tuesday!'),
    )
    sender.add_periodic_task(
        crontab(hour=9, minute=0, day_of_week=3),
        test.s('Happy Wednesday!'),
    )
    sender.add_periodic_task(
        crontab(hour=9, minute=0, day_of_week=4),
        test.s('Happy Thursday!'),
    )
    sender.add_periodic_task(
        crontab(hour=9, minute=0, day_of_week=1),
        test.s('Happy Friday!'),
    )
@app.task
def test(arg):
    print(arg)

相关内容

  • 没有找到相关文章

最新更新