Akka.net Actor并行执行



我们在akka.net上进行POC来处理JSON文件。我正在为批处理过程Jarray的最佳方法而苦苦挣扎。在我的实施中,AKKA协调员Actor收到以下消息:

//coordinator actor receive
public class ValidatedInput
{
public JArray Data { get; set; }
}

我的协调员演员可以在下面的单个过程中处理完整的jarray,但我正在努力启动并行演员的数量,每个演员都会从jarray处理50个记录。

//coordinator actor receives messages and calls transform actor to process
public void Receiving()
{
Receive<ValidatedInput>(x =>
{
TransformerRouter.Tell(x);
});
}
//transform actor receives message and process, sample code
Receive<ValidatedInput>(x =>
{
PipeToSupport.PipeTo<TransformResult>(MapDataAsync(x).ContinueWith(data =>
{
return new TransformResult();}), Self);
});

是否有下面的任何方式,我可以通过50个Jarray记录由每个演员处理并收集结果,例如:

Receive<ValidatedInputDataResult>(
{
TransformerRouter.Tell(x.Data.Take(50);
});

一段时间没有使用akka.net,但是当我这样做时,我总是避免在可能的情况下传递藏品,这是两个主要原因:

  • 您可以发送给演员的消息大小是有限制的,尽管可以增加此限制,但不建议这样做。

  • 所有发送给演员的消息均已序列化,然后在Receive<>'D时进行了序列化,这意味着,如果您在消息中发送数组或其他对象集合,则可能会在每个大小物体上分配它们您使用Tell方法的时间,如果这是热代码路径,则应避免尽可能多的东西。

当时我解决了这种问题的方式是:

  • 有一个"顶级"协调员演员:
    • 包含一个在工人演员背后的路由器。例如,您可以将路由器配置为以圆形旋转方式分发消息。
    • 每当新消息是Receive'D时,就会产生"聚合"演员,工人演员将其结果发送给。您可以使用Tell方法并传递聚合器的演员参考,以便工人在其演员上下文中将聚合器视为Sender
  • "顶级"演员中的路由器被配置为在需要时自动产生更多演员
  • 工人参与者只做Receive单个消息,对其进行处理,然后在Tell中进行Sender

请记住,此建议可能不完整,因为我当时使用Actor Systems并不是很"流利",而且我没有积极使用Akka.net大约6个月了,并且可能会更好完成您需要的方法。

我建议在Google上搜索" Actor System模式"one_answers" Scala Actor模式",并阅读一些开源Scala项目源代码,这也将为您提供一些见解。

最后,避免未来头痛的提示:消息类型应始终不变。因此,您的ValidatedInput应该看起来像这样:

public class ValidatedInput
{
    public readonly JArray Data { get; }
    public ValidatedInput(JArray data)
    {
        Data = data;
    }
}

或更高:

public class ValidatedInput
{
    public readonly IReadOnlyList<JToken> Data { get; }
    public ValidatedInput(IReadOnlyList<JToken> data)
    {
        Data = data;
    }
}

希望这会有所帮助,祝您好运!

最新更新