芹菜/红色任务并不总是完成 - 不知道为什么或如何适应它



我正在django v 3.0.1应用程序(Python v 3.6.9(中运行celele v 4.0.3/redis v 4.09。我还在celele任务find_faces中使用face_recognition来查找上传到应用程序的图像中的人脸,以及其他图像处理celele的任务。处理五个或五个以下的图像文件没有问题,因为所有的图像处理芹菜任务都成功完成了。

当我让图像处理任务(包括find_faces(迭代100个图像时,有10-30个图像的find_faces任务没有完成。当我使用flower v0.9.7来查看芹菜任务时,我看到find_faces任务状态为"0";"启动";对于那些未完成的图像。所有其它图像都具有CCD_ 5任务状态;成功";。这些";"启动";任务永远不会更改,也不会报告任何错误或异常。然后,我可以在这些图像中的每个图像上单独运行图像处理任务,包括find_faces任务,并且任务状态为"0";成功";。如果我作为守护进程或在本地运行芹菜,或者如果我使用wsgi和apache或runserver运行django应用程序,这些结果不会改变。Flower还报告我的所有任务的重试次数均为0。

我在django应用程序中全局设置了CELERYD_TASK_SOFT_TIME_LIMIT = 60max_retries=5用于find_faces任务。

@app.task(bind=True, max_retries=5)
def find_faces_task(self, document_id, use_cuda=settings.USE_CUDA):
logger.debug("find_faces_task START")
try:
temp_face = None
from memorabilia.models import TaskStatus, Document      
args = "document_id=%s, use_cuda=%s" % (document_id, use_cuda)
ts = TaskStatus(document_id_id=document_id, task_id=self.request.id, task_name='find_faces_task', task_args=args, task_status=TaskStatus.PENDING)
ts.save()
import time
time_start = time.time()
# Check if we already have the faces for this document
from biometric_identification.models import Face
if len(Face.objects.filter(document_id=document_id)) != 0:
# This document has already been scanned, so need to remove it and rescan
# Have to manually delete each object per django docs to insure the 
# model delete method is run to update the metadata.
logger.debug("Document %s has already been scanned" % document_id)
faces = Face.objects.filter(document_id=document_id)
for face in faces:
face.delete()
logger.debug("Deleted face=%s" % face.tag_value.value)
document = Document.objects.get(document_id=document_id)
image_file = document.get_default_image_file(settings.DEFAULT_DISPLAY_IMAGE)
image_path = image_file.path
time_start_looking = time.time()
temp_file = open(image_path, 'rb')
temp_image = Image.open(temp_file)
logger.debug("fred.mode=%s" % fred.mode)
width, height = temp_image.size
image = face_recognition.load_image_file(temp_file)
# Get the coordinates of each face
if use_cuda:
# With CUDA installed
logger.debug("Using CUDA for face recognition")
face_locations = face_recognition.face_locations(image, model="cnn", number_of_times_to_upsample=0) 
else:
# without CUDA installed
logger.debug("NOT using CUDA for face recognition")
face_locations = face_recognition.face_locations(image, model="hog", number_of_times_to_upsample=2)
time_find_faces = time.time()
# Get the face encodings for each face in the picture    
face_encodings = face_recognition.face_encodings(image, known_face_locations=face_locations) 
logger.debug("Found %s face locations and %s encodings" % (len(face_locations), len(face_encodings)))
time_face_encodings = time.time()
# Save the faces found in the database
for location, encoding in zip(face_locations, face_encodings):
# Create the new Face object and load in the document, encoding, and location of a face found
# Locations seem to be of the form (y,x)
from memorabilia.models import MetaData, MetaDataValue
tag_type_people = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Tag_types')[0].metadata_id, value='People')[0]
tag_value_unknown = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Unknown')[0].metadata_id, value='Unknown')[0]
new_face = Face(document=document, face_encoding=numpy_to_json(encoding), face_location=location, image_size={'width': width, "height":height}, tag_type=tag_type_people, tag_value=tag_value_unknown)         
# save the newly found Face object
new_face.save()
logger.debug("Saved new_face %s" % new_face.face_file) 
time_end = time.time()
logger.debug("total time = {}".format(time_end - time_start))
logger.debug("time to find faces = {}".format(time_find_faces - time_start_looking))
logger.debug("time to find encodings = {}".format(time_face_encodings - time_find_faces))
ts.task_status = TaskStatus.SUCCESS
ts.comment = "Found %s faces" % len(face_encodings)
return document_id
except Exception as e:
logger.exception("Hit an exception in find_faces_task %s" % str(e))
ts.task_status = TaskStatus.ERROR
ts.comment = "An exception while finding faces: %s" % repr(e)
finally:
logger.debug("Finally clause in find-faces_task")
if temp_image:
temp_image.close()
if temp_file:
temp_file.close()
ts.save(update_fields=['task_status', 'comment'])
logger.debug("find_faces_task END")

find_faces任务被称为操作图像的更大任务链的一部分。每个图像文件都经过这个链,其中步骤_1和步骤_2是不同图像处理步骤的和弦:

step_1 = chord( group( clean ), chordfinisher.si() ) # clean creates different image sizes
step_2 = chord( group( jobs ), chordfinisher.si() )  # jobs include find_faces
transaction.on_commit(lambda: chain(step_1, step_2, faces_2, ocr_job, change_state_task.si(document_id, 'ready')).delay())
@app.task
def chordfinisher( *args, **kwargs ):
return "OK"

图像很大,因此find_faces任务最多需要30秒才能完成。我以为CELERYD_TASK_SOFT_TIME_LIMIT = 60可以处理这么长的处理时间。

我决不是芹菜专家,所以我认为我需要启用一个芹菜设置或选项,以确保find_faces任务始终完成。我只是不知道那会是什么。

经过更多的研究,我可以接受刘易斯·卡罗尔在这篇文章中的这个建议"小心那个杀oom的人,我的儿子!咬的颚,抓的爪&";,这个帖子Chaining Chords产生了巨大的消息,导致工人OOM,这个帖子WorkerLostError:工人过早退出:exitcode 155。

我的芹菜工人似乎已经没有记忆了,因为我确实在我的系统日志中发现了可怕的oomkiller的痕迹。我将我的任务重新配置为只在一个链中(删除了所有的组和和弦(,这样每个任务都会针对每个图像按顺序单独运行,并且无论我处理了多少图像,任务都会成功完成。

相关内容

  • 没有找到相关文章

最新更新