akka.net 路由器在广播毒丸时不会终止



我在尝试 akka.net 路由器时遇到问题。我的循环路由器在广播毒丸消息后不会终止,如果路由中存在使用主管策略处理的异常。如果未在路由中抛出或使用 try catch 处理异常,则路由器执行组件会正常终止。我的方法有什么遗漏吗?

重现问题的示例代码:

using System;
using System.IO;
using Akka.Actor;
using Akka.Event;
using Akka.Routing;
using NLog;
using NLog.Config;
using NLog.Targets;
using LogLevel = NLog.LogLevel;
namespace AkkaTest
{
class Program
{
static void Main(string[] args)
{
InitNlog();
var actorSystem = ActorSystem.Create("ActorSystem");
IActorRef coordinator = actorSystem.ActorOf(Props.Create(() => new Coordinator()));
for (int i = 0; i < 1000; i++)
{
ChildActor.ProcessData processData = new ChildActor.ProcessData(i);
coordinator.Tell(processData);
}
coordinator.Tell(new Coordinator.DisposeAll());
Console.ReadLine();
}
static void InitNlog()
{
// Step 1. Create configuration object 
var config = new LoggingConfiguration();
// Step 2. Create targets and add them to the configuration 
var consoleTarget = new ColoredConsoleTarget();
config.AddTarget("console", consoleTarget);
// Step 3. Set target properties 
consoleTarget.Layout = @"${date:format=HH:mm:ss} ${logger} ${message}";
// Step 4. Define rules
var rule1 = new LoggingRule("*", LogLevel.Debug, consoleTarget);
config.LoggingRules.Add(rule1);
// Step 5. Activate the configuration
LogManager.Configuration = config;
}
}
public class Coordinator : ReceiveActor
{
public class DisposeAll
{
}

private readonly ILoggingAdapter _logger = Context.GetLogger();
private IActorRef _consumer;
public Coordinator()
{
Receive<ChildActor.ProcessData>(x => { _consumer.Tell(x); });
Receive<DisposeAll>(x => { _consumer.Tell(x); });
}
protected override void PreStart()
{
if (Context.Child("Consumer").Equals(ActorRefs.Nobody))
{
_consumer = Context.ActorOf(
Props.Create(() => new Consumer())
, "Consumer");               
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(ex =>
{
if (ex is InvalidDataException)
{
return Directive.Resume;
}
return Directive.Stop;
});
}
}
public class Consumer : ReceiveActor
{
private readonly ILoggingAdapter _logger = Context.GetLogger();
private IActorRef _childRouter;
private int _progress;
public Consumer()
{
Receive<ChildActor.ProcessData>(x =>
{
_progress++;
if(_progress%100==0) _logger.Info("{0} items pushed to router", _progress);
_childRouter.Forward(x);
});
Receive<Terminated>(x =>
{                
_logger.Error("Child Router terminated.");
});
Receive<Coordinator.DisposeAll>(x => { _childRouter.Forward(new Broadcast(PoisonPill.Instance)); });
}
protected override void PreStart()
{
if (Context.Child("ChildRouter").Equals(ActorRefs.Nobody))
{
_childRouter =
Context.ActorOf(
Props.Create(() => new ChildActor())
.WithRouter(new RoundRobinPool(100))
.WithSupervisorStrategy(new OneForOneStrategy(ex => Directive.Escalate)), "ChildRouter");
Context.Watch(_childRouter);
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(ex => Directive.Escalate);
}
}

public class ChildActor : ReceiveActor
{
public class ProcessData
{
public  int Data { get; private set; }
public ProcessData(int data)
{
Data = data;
}
}
private readonly ILoggingAdapter _logger = Context.GetLogger();
public ChildActor()
{
Receive<ProcessData>(x =>
{
if (x.Data % 5 == 0)
{
_logger.Info("{0} is Divisible by 5", x.Data);
}
else
{
//if this line is commented, router terminates just fine
throw new InvalidDataException("Error while processing.");
}
});
}
}
}

Broadcast消息用于将给定消息发送到给定路由器的所有子级。这也正是您正在发生的事情 - 您发送的是路由器的毒丸停止子级,而不是路由器本身。

如果要使用PoisonPill杀死路由器,请直接将其发送到路由器。

最新更新