如何等待芹菜链的任务完成?



我正在尝试在这里创建一个芹菜链:

chain(getAllProducts.s(shopname, hdrs),
editOgTags.s(title, description, whichImage, readableShopname, currentThemeId),
notifyBulkEditFinish.si(email, name, readableShopname, totalProducts),
updateBulkEditTask.si(taskID))()

在 editOgTags 中,有 3 个子任务:

@shared_task(ignore_result=True)
def editOgTags(products, title, description, whichImage, readableShopname, currentThemeId):
for product in products:
editOgTitle.delay(product, title, readableShopname)
editOgDescription.delay(product, description, readableShopname)
editOgImage.delay(product, int(whichImage), currentThemeId)

在每个 editOgXXX 函数中,都有一个要调用的具有速率限制的函数:

@shared_task(rate_limit='1/s')
def updateMetafield(index, loop_var, target_id, type_value):
resource = type_value + 's'
# print(f"loop_var key = {loop_var[index]['key']}")
if type_value == 'product' or type_value == 'collection' or type_value == 'article' or type_value == 'page':
meta = shopify.Metafield.find(resource=resource, resource_id=target_id, namespace='global', key=loop_var[index]['key'])
checkAndWaitShopifyAPICallLimit()
else:
print("Not available metafield type! Cannot update.")
return
if meta:
# meta[0].destroy()
meta[0].value = loop_var[index]['value']
meta[0].save()
else:
metafield = shopify.Metafield.create({
'value_type': 'string',
'namespace': 'global',
'value': loop_var[index]['value'],
'value-type': 'string',
'key': loop_var[index]['key'],
'resource': resource,
'resource_id': target_id,
})
metafield.save()

在漏桶算法下,一次提供 40 次 API 调用,2 次请求/秒补充。由于shopify 功能有 2 个请求/秒的速率限制。我将速率限制设置为 1/s。当它用完 api 配额时,我将调用 time.sleep(20( 以等待 checkAndWaitShopifyAPICallLimit(( 中的补充。

问题是在所有任务完成之前调用电子邮件通知函数(notifyBulkEditFinish(。如何确保在所有任务完成后调用电子邮件功能?

我怀疑睡眠功能使任务落后于队列中的电子邮件功能。

您的问题在于"在所有任务完成后"的定义。

editOgTags启动len(products) * 3子任务 - 显然每个子任务都启动另一个异步子堆栈。如果要等到所有这些任务都执行完毕后再发送电子邮件,则需要一些同步机制。Celery 为此的内置解决方案是chord对象。ATM,您的代码等待editOgTags完成,但此任务唯一要做的就是启动其他子任务 - 然后它返回,无论这些子任务本身是否完成。

和弦就像一个组,但有一个回调。链基元允许我们将签名链接在一起,以便一个接一个地调用,本质上形成一个回调链。将链条改为和弦有什么区别?

请注意,我并不是说您应该用chord替换整个chain。提示:链和组和弦任务,因此您可以通过组合任务、链、组和和弦来创建复杂的工作流程。

如上所述,不同之处在于chord将等到其标头中的所有任务都完成后再执行回调。这允许并行执行 N 个异步任务,但仍在运行回调之前等待所有任务完成。这当然需要对您的代码进行一些思考和可能的重组(因此如果需要,会考虑子子任务(,但这确实回答了您的问题:"我如何确保在所有任务完成后调用电子邮件函数?

展开@bruno的注释:使用chord并修改editOgTags函数以创建一个与通知产生共鸣的组:

from celery import chord
@shared_task(ignore_result=True)
def editOgTags(products, title, description, whichImage, readableShopname, currentThemeId, name, email, totalProducts):
tasks = []
for product in products:
tasks.append(editOgTitle.si(product, title, readableShopname))
tasks.append(editOgDescription.si(product, description, readableShopname))
tasks.append(editOgImage.si(product, int(whichImage), currentThemeId))
# kick off the chord, notifyBulk... will be called after all of these 
# edit... tasks complete.
chord(tasks)(notifyBulkEditFinish.si(email, name, readableShopname, totalProducts))

相关内容

  • 没有找到相关文章

最新更新