# models.py
from django.db import models
class Person(models.Model):
first_name = models.CharField(max_length=30)
last_name = models.CharField(max_length=30)
text_blob = models.CharField(max_length=50000)
# tasks.py
import celery
@celery.task
def my_task(person):
# example operation: does something to person
# needs only a few of the attributes of person
# and not the entire bulky record
person.first_name = person.first_name.title()
person.last_name = person.last_name.title()
person.save()
在我的应用程序的某个地方,我有这样的东西:
from models import Person
from tasks import my_task
import celery
g = celery.group([my_task.s(p) for p in Person.objects.all()])
g.apply_async()
- 芹菜泡菜p发送给工人对吗?
- 如果worker在多台机器上运行,那么整个person对象(以及主要不需要的庞大text_blob)会通过网络传输吗?有办法避免吗?
如何有效且均匀地将Person记录分发给在多台机器上运行的工人?
这是一个更好的主意吗?如果Person有几百万条记录,它不会使数据库不堪重负吗?
# tasks.py import celery from models import Person @celery.task def my_task(person_pk): # example operation that does not need text_blob person = Person.objects.get(pk=person_pk) person.first_name = person.first_name.title() person.last_name = person.last_name.title() person.save() #In my application somewhere from models import Person from tasks import my_task import celery g = celery.group([my_task.s(p.pk) for p in Person.objects.all()]) g.apply_async()
我认为传递PK比传递整个模型对象更好更安全。由于PK只是一个数字,因此序列化也简单得多。最重要的是,您可以使用更安全的序列化器(json/yaml而不是pickle),并且在序列化模型时不会遇到任何问题。
正如这篇文章所说:
因为芹菜是一个分布式系统,你不能知道哪个进程,甚至任务将在哪个机器上运行。因此,你不应该将Django模型对象作为参数传递给任务,最好从数据库中重新获取对象,因为可能会涉及到竞争条件。
是。如果数据库中有数百万条记录,那么这可能不是最好的方法,但是由于您必须遍历所有数百万条记录,那么无论您做什么,您的数据库都会受到相当大的打击。
这里有一些替代方案,没有一个我认为"更好",只是不同。
- 为Person类实现一个pre_save信号处理程序,它执行.title()操作。这样,您的first_name/last_names将始终正确地存储在数据库中,您将不必再次这样做。
- 使用一个带有分页参数的管理命令…也许可以用姓氏的第一个字母来划分人物。所以运行
./manage.py my_task a
将更新姓氏以"a"开头的所有记录。显然,你必须运行这个几次才能遍历整个数据库 也许你可以用一些创造性的sql来做。我甚至不打算在这里尝试,但它可能值得研究。
请记住,.save()将比实际选择数百万条记录更难以"击中"数据库。