我有一个类似的函数:
def long_running_with_more_values(start, stop):
headers = get_headers.delay(start, stop)
insert_to_db.delay(headers)
此函数用于批处理从网络并行请求的数据。Get_headers + insert_to_db被触发到消息堆栈,最后由celery worker处理,所以不会阻塞执行。
它必须处理开始和停止之间的每个数字,但可以将其拆分为段(范围)。
我发现操作get_headers是最佳的,当范围是~20000,其中范围=(停止-开始)
我想知道如何将任意范围分成20000组,并通过函数运行每组,这样我最终会以不同的开始和停止值多次调用函数,但仍然覆盖前一个范围。
所以对于start和stop的起始值分别为1和100000,我希望get_headers被调用5次:
[1,20000][20001,40000][40001,60000][60001,80000][80001,100000]
def long_running_with_more_values(start, stop):
while start < stop:
if stop - start < 20000:
headers = get_headers.delay(start, stop)
break
else:
headers = get_headers.delay(start, start + 20000)
start += 20000
insert_to_db.delay(headers)
注意,headers
将只存储最后一次调用get_headers.delay()
的返回值。您可能需要将代码更改为headers += get_headers.delay(start, stop)
。如果不知道get_headers.delay()
方法的返回值是什么,我真的不能告诉。