使用Django QuerySet处理数据库块的最佳方式



我正在对数据库中的所有行运行批处理操作。这涉及到选择每一个模型并对它做一些事情。把它分成几块,一块一块地做是有意义的。

我目前使用Paginator,因为它很方便。这意味着我需要对这些值进行排序,以便可以按顺序对它们进行分页。这确实会生成具有orderlimit子句的SQL语句,并且对于每个块,我认为Postgres可能会对整个表进行排序(尽管我不能声称对内部有任何了解)。我所知道的是,数据库是在大约50%的CPU,我认为这是太高了,只是做select s。

在RDMBS/cpu友好的方式中遍历整个表的最佳方式是什么?

假设数据库的内容在批处理操作期间没有变化

从您的描述中,您实际上并不关心排序顺序您处理的行。如果您的表中有主键(这是我所期望的!),那么这种简单的分区方法将快得多:

SELECT * FROM tbl WHERE id BETWEEN 0    AND 1000;
SELECT * FROM tbl WHERE id BETWEEN 1001 AND 2000;
...

对于任何偏移量和(几乎)任何大小的表执行相同的操作。相应地检索主键和分区的最小值和最大值:

SELECT min(id), max(id) from tbl; -- then divide in suitable chunks

相对于:

SELECT * FROM tbl ORDER BY id LIMIT 1000;
SELECT * FROM tbl ORDER BY id LIMIT 1000 OFFSET 1000;
...

这通常较慢,因为所有行都必须排序,并且随着偏移量的增加和表的增大,性能会进一步降低。

下面的代码为Django QuerySet实现了Erwin上面的答案(使用BETWEEN):

对于任意的Django QuerySet,下面是一个实用函数。它默认假设'id'是between子句的合适字段。

def chunked_queryset(qs, batch_size, index='id'):
    """
    Yields a queryset split into batches of maximum size 'batch_size'.
    Any ordering on the queryset is discarded.
    """
    qs = qs.order_by()  # clear ordering
    min_max = qs.aggregate(min=models.Min(index), max=models.Max(index))
    min_id, max_id = min_max['min'], min_max['max']
    for i in range(min_id, max_id + 1, batch_size):
        filter_args = {'{0}__range'.format(index): (i, i + batch_size - 1)}
        yield qs.filter(**filter_args)

可以这样使用:

for chunk in chunked_queryset(SomeModel.objects.all(), 20):
    # `chunk` is a queryset
    for item in chunk:
        # `item` is a SomeModel instance
        pass

您也可以更改接口,这样您就不需要额外的嵌套循环,但可以这样做for item in chunked_queryset(qs):

def chunked_queryset(qs, batch_size, index='id'):
    """
    Yields a queryset that will be evaluated in batches
    """
    qs = qs.order_by()  # clear ordering
    min_max = qs.aggregate(min=models.Min(index), max=models.Max(index))
    min_id, max_id = min_max['min'], min_max['max']
    for i in range(min_id, max_id + 1, batch_size):
        filter_args = {'{0}__range'.format(index): (i, i + batch_size - 1)}
        for item in qs.filter(**filter_args):
            yield item

最新更新