Akka.Net 流比并行循环慢



我有以下两个代码:

var jbesSource = Source.From(jobSeekers);
var runnableGraph = jbesSource
.Via(Flow.FromGraph(GraphDsl.Create(b =>
{
Flow<(JobSeeker_JBE, List<JBE>), Dictionary<JBE, ICollection<JobCI>>, NotUsed> worker =
FindJobsFlow.Instance(cache);
var merge = b.Add(new Merge<Dictionary<JBE, ICollection<JobCI>>>(8));
var balancer2 = b.Add(new Balance<JobSeeker_JBE>(8));
b.From(balancer2).Via(filterJobAlertsFlow).Via(worker).To(merge);
b.From(balancer2).Via(filterJobAlertsFlow).Via(worker).To(merge);
b.From(balancer2).Via(filterJobAlertsFlow).Via(worker).To(merge);
b.From(balancer2).Via(filterJobAlertsFlow).Via(worker).To(merge);
b.From(balancer2).Via(filterJobAlertsFlow).Via(worker).To(merge);
b.From(balancer2).Via(filterJobAlertsFlow).Via(worker).To(merge);
b.From(balancer2).Via(filterJobAlertsFlow).Via(worker).To(merge);
b.From(balancer2).Via(filterJobAlertsFlow).Via(worker).To(merge);
return new FlowShape<JobSeeker_JBE, Dictionary<JBE, ICollection<JobCI>>>
(balancer2.In, merge.Out);
})).Async())
.Async()
.ToMaterialized(Sink.Seq<Dictionary<JBE, ICollection<JobCI>>>(), Keep.Right);

这比上面快得多:

Parallel.ForEach(jobseekers, js =>
{
var jobs = FindJobs(js);
}

两者都在做完全相同的工作。

FindJobs函数使用REST API,所以它基本上是IO。

知道为什么一个简单的循环会比Akka.Streaming快得多吗?

Akka.NET 流的主要用例是处理长(可能是无限(的事件流,这些事件流可以包含许多不同的处理步骤。如果您想要并行调用多个作业,那么这绝对不是适合您需求的工具。

话虽如此,如果您想异步处理不同的事件,您可以像这样处理它:

jbesSource
// use SelectAsync if the order of output values must match order of inputs
.SelectAsyncUnordered(maxParallelism, FindJobsAsync)
.RunWith(Sink.Seq<T>(), materializer);

例如,maxParallelism可以是Environment.ProcessorCount或与计算机核心数匹配的任何值。

最新更新