我们在akka.net上进行POC来处理JSON文件。我正在为批处理过程Jarray的最佳方法而苦苦挣扎。在我的实施中,AKKA协调员Actor收到以下消息:
//coordinator actor receive
public class ValidatedInput
{
public JArray Data { get; set; }
}
我的协调员演员可以在下面的单个过程中处理完整的jarray,但我正在努力启动并行演员的数量,每个演员都会从jarray处理50个记录。
//coordinator actor receives messages and calls transform actor to process
public void Receiving()
{
Receive<ValidatedInput>(x =>
{
TransformerRouter.Tell(x);
});
}
//transform actor receives message and process, sample code
Receive<ValidatedInput>(x =>
{
PipeToSupport.PipeTo<TransformResult>(MapDataAsync(x).ContinueWith(data =>
{
return new TransformResult();}), Self);
});
是否有下面的任何方式,我可以通过50个Jarray记录由每个演员处理并收集结果,例如:
Receive<ValidatedInputDataResult>(
{
TransformerRouter.Tell(x.Data.Take(50);
});
一段时间没有使用akka.net,但是当我这样做时,我总是避免在可能的情况下传递藏品,这是两个主要原因:
-
您可以发送给演员的消息大小是有限制的,尽管可以增加此限制,但不建议这样做。
-
所有发送给演员的消息均已序列化,然后在
Receive<>
'D时进行了序列化,这意味着,如果您在消息中发送数组或其他对象集合,则可能会在每个大小物体上分配它们您使用Tell
方法的时间,如果这是热代码路径,则应避免尽可能多的东西。
当时我解决了这种问题的方式是:
- 有一个"顶级"协调员演员:
- 包含一个在工人演员背后的路由器。例如,您可以将路由器配置为以圆形旋转方式分发消息。
- 每当新消息是
Receive
'D时,就会产生"聚合"演员,工人演员将其结果发送给。您可以使用Tell
方法并传递聚合器的演员参考,以便工人在其演员上下文中将聚合器视为Sender
。
- "顶级"演员中的路由器被配置为在需要时自动产生更多演员
- 工人参与者只做
Receive
单个消息,对其进行处理,然后在Tell
中进行Sender
。
请记住,此建议可能不完整,因为我当时使用Actor Systems并不是很"流利",而且我没有积极使用Akka.net大约6个月了,并且可能会更好完成您需要的方法。
我建议在Google上搜索" Actor System模式"one_answers" Scala Actor模式",并阅读一些开源Scala项目源代码,这也将为您提供一些见解。
最后,避免未来头痛的提示:消息类型应始终不变。因此,您的ValidatedInput
应该看起来像这样:
public class ValidatedInput
{
public readonly JArray Data { get; }
public ValidatedInput(JArray data)
{
Data = data;
}
}
或更高:
public class ValidatedInput
{
public readonly IReadOnlyList<JToken> Data { get; }
public ValidatedInput(IReadOnlyList<JToken> data)
{
Data = data;
}
}
希望这会有所帮助,祝您好运!