我遇到了一个奇怪的情况,芹菜将重新处理已完成的任务。总体设计看起来像这样:
芹菜节拍:定期拉出文件,如果将文件拉出,则在DB中创建了一个新条目,并在1个工作队列中将该文件的代表处理到另一个芹菜任务(这样,只有1个文件一次处理(
芹菜任务:处理文件,完成后完成,没有重试,没有循环。
@app.task(name='periodic_pull_file')
def periodic_pull_file():
for f in get_files_from_some_dir(...):
ingested_file = IngestedFile(filename=filename)
ingested_file.document.save(filename, File(f))
ingested_file.save()
process_import(ingested_file.id)
#deletes the file from the dir source
os.remove(....somepath)
def process_import(ingested_file_id):
ingested_file = IngestedFile.objects.get(id=ingested_file_id)
if 'foo' in ingested_file.filename.lower():
f = process_foo
else:
f = process_real_stuff
f.apply_async(args=[ingested_file_id], queue='import')
@app.task(name='process_real_stuff')
def process_real_stuff(file_id):
#dostuff
process_foo和process_real_stuff只是一个函数,一旦完成,就可以在文件上循环。我实际上可以跟踪它所处位置的百分比,我注意到的有趣的是,同一文件不断地一遍又一遍地处理(请注意,这些是大文件,处理很慢,需要花费数小时才能处理。现在我开始想知道它是否只是在队列中创建重复的任务。当我有13个未决的文件以导入的文件时,我检查了我的redis队列:
-bash-4.1$ redis-cli -p 6380 llen import
(integer) 13
和aha,13,我检查了每个排队任务的内容,以查看是否仅重复使用:
redis-cli -p 6380 lrange import 0 -1
它们都是独特的任务,具有唯一的ingested_file_id。我忽略了什么吗?有什么原因为什么它会一遍又一遍地完成任务 ->循环?直到最近才开始发生,没有代码更改。在事物曾经变得很活泼且无缝之前。我知道这也不是从某种程度上神奇地重试的"失败"过程中,因为它没有在队列中移动。即,它一次又一次地以相同的顺序接收相同的任务,因此它永远不会触摸它应该处理的其他13个文件。
注意,这是我的工人:
python manage.py celery worker -A myapp -l info -c 1 -Q import
使用此
芹菜-Q Your_queue_name Purge