持久编排器在Task.WhenAll之后卡住



我有一个包含4个活动的编排器:

  1. PromoDataFromActpm-下载一些数据的单个活动从API。
  2. PromoDataExport-从活动#1发送数据的单个活动到Azure存储
  3. SavePromoProductFromACPSActivity-平行活动,即为活动#1中的每个项目调用一些API并进行下载一些数据
  4. TableToBlobPromoProductActivity-写的并行活动从活动#3到blob存储的项目

对于活动#3,集合中的每个项目是一个活动调用,对于活动#4,每个活动调用被批处理为50个项目,这些项目正在等待Task.WhenAll.

在本地环境下一切正常,但在azure上,编排器在活动#3 Task后停止处理。因为某些原因。我得到许多请求SavePromoProductFromACPSActivity在日志中,应该是这样的,但一段时间后,它们停止并TableToBlobPromoProductActivityActivity永远不会被调用。我只是偶尔得到"执行XYZ编排器",几分钟后得到"执行XYZ编排器",这些消息之间没有活动调用。

我已经和它斗争了一段时间,但没有成功。什么好主意吗?

代码如下:

var orchestrationId = context.InstanceId.Replace(":","");
var promoData = await context.CallActivityAsync<PromotionExportModel[]>(FunctionNamesExport.Activity.PromoDataFromActpm, null);
var exportResult = await context.CallActivityAsync<OperationResponse>(FunctionNamesExport.Activity.PromoDataExport, promoData);
var acpsTasks = new List<Task<List<PromotedProductExportModel>>>();
var acpsPromos = new List<PromotedProductExportModel>();
foreach (var promo in promoData)
{
acpsTasks.Add(context.CallActivityAsync<List<PromotedProductExportModel>>(FunctionNamesExport.Activity.SavePromoProductFromACPSActivity, promo));
}
await Task.WhenAll(acpsTasks);
acpsTasks.ForEach(x => acpsPromos.AddRange(x.Result));
var promoDataBatched = acpsPromos.Batch(50);
var tasks = new List<Task>();
foreach(var arr in promoDataBatched)
{
var promoBlob = new PromotionExportSubModel
{
PromotionExportModel = arr.ToArray(),
blockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray()),
orchestrationId = orchestrationId
};
tasks.Add(context.CallActivityAsync(FunctionNamesExport.Activity.TableToBlobPromoProductActivity, promoBlob));
}
await Task.WhenAll(tasks);

既然你在评论中提到你在步骤3中并行启动了3000个活动,我有一个想法可能会发生什么。请记住,每个活动都会向历史表中添加行,这些行都需要在编排器的每次重播时加载(在每个活动返回后发生)。所以加载时间会越来越长,内存占用也会越来越大。

我对此使用的一个典型解决方案是将工作拆分为子编排器。例如,将数据分成100个批次,为每个批次启动子编排器以并行运行。然后在子编排器中执行实际的第三步活动。通过这种方式,子协调器实例的历史表行被限制在100个活动所需的行,而主协调器只获得约30个结果。您可以对步骤4执行类似的操作。

相关内容

最新更新