我有三个晶粒(A、B和C(在管道中做不同的工作。GrainA将把结果传递给grainB,grainB将把结果传给grainC。我想保证在连续的晶粒之间按顺序发送消息,这可以通过以下方式实现。
// client code
foreach(var i in list)
{
await grainA.job(i);
}
//grain A code
async Task job(var i)
{
DoSomeWorkA(i);
await grainB.job(i);
}
//grain B code
async Task job(var i)
{
DoSomeWorkB(i);
await grainC.job(i);
}
//grain C code
async Task job(var i)
{
DoSomeWorkC(i);
Console.WriteLine(i);
}
然而,这个代码的问题是没有流水线。只有当当前对象通过所有的grainB和grainC时,grainA才会被赋予net对象(因为await语句(。获得流水线的一种方法是不使用等待,而是直接一个接一个地发送对象。然而,正如本文所解释的那样,这会导致订单交付出现问题。
我想让执行完全流水线化,这意味着当grainB接收到来自grainA的结果时,grainA继续执行它的下一个作业。但是,消息排序也很重要,因为我发送一些控制消息。如何在奥尔良做到这一点
为了使高度并发的系统易于编程和推理,Orleans将每个粒度(计算单位和状态(的并行性和并发性限制为1。这意味着,在任何时间点,最多将同时处理一条消息,最多将有一个线程执行给定粒度激活的代码。
这大大简化了开发人员需要关注的内容,因为不再需要像锁这样的线程同步原语。
但是,行为是可配置的。如果您不想要这种默认的"安全"行为,那么您可以在粒子的接口上将粒子标记为[Reentrant]
或将单个方法标记为[AlwaysInterleave]
。
这允许同时处理许多消息。在粒度方法中的每个await
点,执行都返回给调度器,调度器可以开始处理另一条消息。调度器仍然确保每个粒度激活的单线程性,因此仍然不需要锁,但允许消息在这些await
点进行交织(即协作多任务处理(。这意味着开发人员现在需要考虑内部状态如何被await
点之间的其他请求所改变。
有关更多信息,请参阅Orleans文档中的"重新进入"页面。
为了提高并行性(对于CPU绑定的任务(,开发人员可以通过Task.Run
或类似方式使用无状态工作者或外部任务。
还要注意,无论并发配置如何,从细粒度激活A发送到细粒度激活B的消息都将始终按顺序发送。同样,对于从B发送到A的结果,请注意,一旦并发性增加,仍然很难推断消息的顺序,但是,因为对B的第二次调用可能在第一次调用之前完成,因此其结果也会更早发送。这意味着结果可能看起来是无序接收的。不过,我想这正是你所期望的。