F#Akka.NET代理同步时的性能优化



我正在努力解决以下问题。我有一些代理实时运行,具有几毫秒的大心跳,因此它们处理的操作顺序大多是确定的(因为消息处理不是瓶颈)。

现在,我正在对我不再有心跳的系统进行大量模拟(否则需要几个世纪),但我需要确保操作顺序得到保留。为此,我采用了以下解决方案:模拟器通过发布一条伪同步消息并在等待答案时进行阻塞,确保每个代理都处理了自己的消息队列。这对我的应用程序确实有效,但所需的时间并不直观,因为单线程实现会快一个数量级(我想是-x100 ish,尽管我还没有测试)。

我已经隔离了一个显示问题的小测试,甚至试图使用另一个库akka.net

type Greet = 
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2  
[<EntryPoint>]
let main argv =
let system = System.create "MySystem" <| Configuration.load()    
let greeter = spawn system "greeter" <| fun mailbox ->
let rec loop() = actor {
let! msg = mailbox.Receive()
let sender = mailbox.Sender()
match msg with
| Greet who -> () // printf "Hello, %s!n" who
| Hello2 -> sender.Tell(true)
| _ -> ()
return! loop()
}
loop()
let greeterF =
MailboxProcessor.Start
(fun inbox ->                
async {
while true do
let! msg = inbox.Receive()
match msg with
| Greet who -> () // printf "Hello, %s!n" who
| Hello reply -> reply.Reply true
| _ -> ()
}
)
let n = 1000000
let t1 = System.Diagnostics.Stopwatch()
t1.Start()
for i = 1 to n do
let rep = greeterF.PostAndReply(fun reply -> (Hello reply)) |> ignore
()
printfn "elapsed Mailbox:%A" t1.ElapsedMilliseconds
t1.Restart()
for i = 1 to n do        
let res = greeter.Ask (Hello2)
let rr = res.Result
()
printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
System.Console.ReadLine () |> ignore
0

基本上,仅100万次同步都需要大约10秒的时间,而不是计算所涉及的内容,这是……不幸的。

我想知道是否有人遇到过同样的问题,是否有任何方法可以关闭强制所有操作以单线程模式运行的开销。。。这比取消激活bios中除1以外的所有cpu要好,或者在没有代理的情况下编写整个系统的克隆。

非常感谢您的帮助。

Akka.NET版本运行缓慢的原因是您如何与参与者通信:

main process    Task     FutureActorRef  !!ThreadPool!!   greeter
Ask ---------------------->
Tell-----------> 
MailboxRun ----->
(greeter mailbox is empty)  |                 
<--------------------------Tell 
<--Complete task
<----------.Result
  1. 对于每个迭代,将创建一个TPL任务

  2. 然后向迎宾员发送一条单一消息

  3. 然后,主进程在等待响应返回时阻塞。

  4. 迎宾员回复,进而完成FutureActorRef内的任务

冲洗并重复。。这种设计将导致Akka.NET在每次迭代的邮箱队列变为空时,为每条消息启动和停止迎宾器"邮箱运行"。这将为传递的每个消息生成线程池调度。

这有点像进入你的车,把踏板踩在金属上,然后突然停下来走出车,然后再次重复这个过程。这不是一种快速旅行的有效方式。

@Aaronontheeb的建议只有在您的代码中解决了上述问题时才会生效。邮箱需要能够不断地从内部队列中挑选消息,以便批量处理消息,以实现全吞吐量。

相反,把生产者和消费者分开。创建一个能倾听迎宾员回应的演员。一旦该参与者处理了您的1000000条消息,就让该参与者将WorkCompleted消息发送回消费者。

[编辑]我自己尝试了一下,我不知道F#,所以它可能不是完全地道的:)

open Akka
open Akka.Actor
open Akka.FSharp
type Greet = 
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2 
type Consume =
| Response
| SetSender
[<EntryPoint>]
let main argv =
let system = System.create "MySystem" <| Configuration.load()    
let greeter = spawn system "greeter" <| fun mailbox ->
let rec loop() = actor {
let! msg = mailbox.Receive()
let sender = mailbox.Sender()
match msg with
| Greet who -> () // printf "Hello, %s!n" who
| Hello2 -> sender.Tell(Response)
| _ -> ()
return! loop()
}
loop()
let consumer = spawn system "consumer" <| fun mailbox ->
let rec loop(count,sender : IActorRef) = actor {
if count = 1000000 then sender.Tell(true)
let! msg = mailbox.Receive()
match msg with
| Response -> return! loop(count+1,sender)
| SetSender -> return! loop(count,mailbox.Sender())
}  
loop(0,null)      
let n = 1000000
let t1 = System.Diagnostics.Stopwatch()
t1.Start()   
for i = 1 to n do        
greeter.Tell(Hello2,consumer)
let workdone = consumer.Ask SetSender
workdone.Wait()
printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
System.Console.ReadLine () |> ignore
0

我更新了您的代码,以便对参与者的响应使用一个单独的消费者,然后在处理完所有回复后进行回复。

通过这样做,您在我的机器上的处理时间现在减少到650毫秒。

如果您想要更好的吞吐量,您需要涉及更多的参与者来并行化更多。

我不确定这是否有助于你的特定场景

这里有一个稍微修改过的MailboxProcessor版本:

module MBPAsync =
type Greet = 
| Greet of string
| Hello of AsyncReplyChannel<bool>
let run n =
let timer = Stopwatch.StartNew ()
use greeter =
MailboxProcessor.Start <| fun inbox -> async {
while true do
let! msg = inbox.Receive()
match msg with
| Greet who -> () // printf "Hello, %s!n" who
| Hello reply -> reply.Reply true
}
Async.RunSynchronously <| async {
for i = 1 to n do
do! Async.Ignore (greeter.PostAndAsyncReply Hello)
}
let elapsed = timer.Elapsed
printfn "%A" elapsed

这里的区别在于,这个版本使用PostAndAsyncReply,并将计算保持在异步工作流中。在我的快速测试中,这似乎比使用PostAndReply要快得多,但使用YMMV。

我从上面的MBP版本中得到的时间大致如下:

> MBPAsync.run 1000000 ;;
00:00:02.6883486
val it : unit = ()

前面的一条评论提到了我的Hopac图书馆。以下是使用Hopac的优化版本:

module Hop =
type Greet = 
| Greet of string
| Hello of IVar<bool>
let run n =
let timer = Stopwatch.StartNew ()
let greeterCh = ch ()
do greeterCh >>= function
| Greet who -> Job.unit ()
| Hello reply -> reply <-= true
|> Job.forever
|> server
Job.forUpToIgnore 1 n <| fun _ ->
let reply = ivar ()
greeterCh <-- Hello reply >>.
reply
|> run
let elapsed = timer.Elapsed
printfn "%A" elapsed

我从上面的Hopac版本中得到的时间大致如下:

> Hop.run 1000000 ;;
00:00:00.1088768
val it : unit = ()

我不是F#开发人员,但我是Akka的核心开发人员。NET。针对您的场景的几个想法:

  1. 如果你只使用一个演员来完成这项工作,你可以尝试使用PinnedDispatcher——这样演员就可以一直在自己的专用线程上运行。这将节省不必要的上下文切换开销。

  2. 您还可以将此PinnedDispatcher的邮箱吞吐量设置为比正常设置高得多。即,将吞吐量值设置为10000(或大约)而不是正常的25。假设您的邮箱内容大量增长,这将节省邮箱同步开销。

以下是您的调度器配置:

my-pinned-dispatcher {
type = PinnedDispatcher
throughput = 1000 #your mileage may vary
}

然后配置一个参与者使用它

C#Fluent接口

var myActor = myActorSystem.ActorOf(Props.Create<FooActor>()
.WithDispatcher("my-pinned-dispatcher");

配置

akka.actor.deployment{
/greeter{
dispatcher = my-pinned-dispatcher
}
}

这两个选项都可以通过App.config或Web.config中的HOCON进行配置,也可以使用Props类上的fluent接口进行配置。同样值得注意的是:目前有一个固定调度器的错误,但应该在下周发布的下一个维护版本(v1.0.1)中修复。

你的里程数可能会有所不同,但这是我想要尝试的——基本上,它只是为了帮助减少围绕单个演员的争论和开销。

相关内容

  • 没有找到相关文章

最新更新