我正在努力解决以下问题。我有一些代理实时运行,具有几毫秒的大心跳,因此它们处理的操作顺序大多是确定的(因为消息处理不是瓶颈)。
现在,我正在对我不再有心跳的系统进行大量模拟(否则需要几个世纪),但我需要确保操作顺序得到保留。为此,我采用了以下解决方案:模拟器通过发布一条伪同步消息并在等待答案时进行阻塞,确保每个代理都处理了自己的消息队列。这对我的应用程序确实有效,但所需的时间并不直观,因为单线程实现会快一个数量级(我想是-x100 ish,尽管我还没有测试)。
我已经隔离了一个显示问题的小测试,甚至试图使用另一个库akka.net
type Greet =
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2
[<EntryPoint>]
let main argv =
let system = System.create "MySystem" <| Configuration.load()
let greeter = spawn system "greeter" <| fun mailbox ->
let rec loop() = actor {
let! msg = mailbox.Receive()
let sender = mailbox.Sender()
match msg with
| Greet who -> () // printf "Hello, %s!n" who
| Hello2 -> sender.Tell(true)
| _ -> ()
return! loop()
}
loop()
let greeterF =
MailboxProcessor.Start
(fun inbox ->
async {
while true do
let! msg = inbox.Receive()
match msg with
| Greet who -> () // printf "Hello, %s!n" who
| Hello reply -> reply.Reply true
| _ -> ()
}
)
let n = 1000000
let t1 = System.Diagnostics.Stopwatch()
t1.Start()
for i = 1 to n do
let rep = greeterF.PostAndReply(fun reply -> (Hello reply)) |> ignore
()
printfn "elapsed Mailbox:%A" t1.ElapsedMilliseconds
t1.Restart()
for i = 1 to n do
let res = greeter.Ask (Hello2)
let rr = res.Result
()
printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
System.Console.ReadLine () |> ignore
0
基本上,仅100万次同步都需要大约10秒的时间,而不是计算所涉及的内容,这是……不幸的。
我想知道是否有人遇到过同样的问题,是否有任何方法可以关闭强制所有操作以单线程模式运行的开销。。。这比取消激活bios中除1以外的所有cpu要好,或者在没有代理的情况下编写整个系统的克隆。
非常感谢您的帮助。
Akka.NET版本运行缓慢的原因是您如何与参与者通信:
main process Task FutureActorRef !!ThreadPool!! greeter
Ask ---------------------->
Tell----------->
MailboxRun ----->
(greeter mailbox is empty) |
<--------------------------Tell
<--Complete task
<----------.Result
对于每个迭代,将创建一个TPL任务
然后向迎宾员发送一条单一消息。
然后,主进程在等待响应返回时阻塞。
迎宾员回复,进而完成
FutureActorRef
内的任务
冲洗并重复。。这种设计将导致Akka.NET在每次迭代的邮箱队列变为空时,为每条消息启动和停止迎宾器"邮箱运行"。这将为传递的每个消息生成线程池调度。
这有点像进入你的车,把踏板踩在金属上,然后突然停下来走出车,然后再次重复这个过程。这不是一种快速旅行的有效方式。
@Aaronontheeb的建议只有在您的代码中解决了上述问题时才会生效。邮箱需要能够不断地从内部队列中挑选消息,以便批量处理消息,以实现全吞吐量。
相反,把生产者和消费者分开。创建一个能倾听迎宾员回应的演员。一旦该参与者处理了您的1000000条消息,就让该参与者将WorkCompleted消息发送回消费者。
[编辑]我自己尝试了一下,我不知道F#,所以它可能不是完全地道的:)
open Akka
open Akka.Actor
open Akka.FSharp
type Greet =
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2
type Consume =
| Response
| SetSender
[<EntryPoint>]
let main argv =
let system = System.create "MySystem" <| Configuration.load()
let greeter = spawn system "greeter" <| fun mailbox ->
let rec loop() = actor {
let! msg = mailbox.Receive()
let sender = mailbox.Sender()
match msg with
| Greet who -> () // printf "Hello, %s!n" who
| Hello2 -> sender.Tell(Response)
| _ -> ()
return! loop()
}
loop()
let consumer = spawn system "consumer" <| fun mailbox ->
let rec loop(count,sender : IActorRef) = actor {
if count = 1000000 then sender.Tell(true)
let! msg = mailbox.Receive()
match msg with
| Response -> return! loop(count+1,sender)
| SetSender -> return! loop(count,mailbox.Sender())
}
loop(0,null)
let n = 1000000
let t1 = System.Diagnostics.Stopwatch()
t1.Start()
for i = 1 to n do
greeter.Tell(Hello2,consumer)
let workdone = consumer.Ask SetSender
workdone.Wait()
printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
System.Console.ReadLine () |> ignore
0
我更新了您的代码,以便对参与者的响应使用一个单独的消费者,然后在处理完所有回复后进行回复。
通过这样做,您在我的机器上的处理时间现在减少到650毫秒。
如果您想要更好的吞吐量,您需要涉及更多的参与者来并行化更多。
我不确定这是否有助于你的特定场景
这里有一个稍微修改过的MailboxProcessor
版本:
module MBPAsync =
type Greet =
| Greet of string
| Hello of AsyncReplyChannel<bool>
let run n =
let timer = Stopwatch.StartNew ()
use greeter =
MailboxProcessor.Start <| fun inbox -> async {
while true do
let! msg = inbox.Receive()
match msg with
| Greet who -> () // printf "Hello, %s!n" who
| Hello reply -> reply.Reply true
}
Async.RunSynchronously <| async {
for i = 1 to n do
do! Async.Ignore (greeter.PostAndAsyncReply Hello)
}
let elapsed = timer.Elapsed
printfn "%A" elapsed
这里的区别在于,这个版本使用PostAndAsyncReply
,并将计算保持在异步工作流中。在我的快速测试中,这似乎比使用PostAndReply
要快得多,但使用YMMV。
我从上面的MBP版本中得到的时间大致如下:
> MBPAsync.run 1000000 ;;
00:00:02.6883486
val it : unit = ()
前面的一条评论提到了我的Hopac图书馆。以下是使用Hopac的优化版本:
module Hop =
type Greet =
| Greet of string
| Hello of IVar<bool>
let run n =
let timer = Stopwatch.StartNew ()
let greeterCh = ch ()
do greeterCh >>= function
| Greet who -> Job.unit ()
| Hello reply -> reply <-= true
|> Job.forever
|> server
Job.forUpToIgnore 1 n <| fun _ ->
let reply = ivar ()
greeterCh <-- Hello reply >>.
reply
|> run
let elapsed = timer.Elapsed
printfn "%A" elapsed
我从上面的Hopac版本中得到的时间大致如下:
> Hop.run 1000000 ;;
00:00:00.1088768
val it : unit = ()
我不是F#开发人员,但我是Akka的核心开发人员。NET。针对您的场景的几个想法:
-
如果你只使用一个演员来完成这项工作,你可以尝试使用
PinnedDispatcher
——这样演员就可以一直在自己的专用线程上运行。这将节省不必要的上下文切换开销。 -
您还可以将此
PinnedDispatcher
的邮箱吞吐量设置为比正常设置高得多。即,将吞吐量值设置为10000(或大约)而不是正常的25。假设您的邮箱内容大量增长,这将节省邮箱同步开销。
以下是您的调度器配置:
my-pinned-dispatcher {
type = PinnedDispatcher
throughput = 1000 #your mileage may vary
}
然后配置一个参与者使用它
C#Fluent接口
var myActor = myActorSystem.ActorOf(Props.Create<FooActor>()
.WithDispatcher("my-pinned-dispatcher");
配置
akka.actor.deployment{
/greeter{
dispatcher = my-pinned-dispatcher
}
}
这两个选项都可以通过App.config或Web.config中的HOCON进行配置,也可以使用Props
类上的fluent接口进行配置。同样值得注意的是:目前有一个固定调度器的错误,但应该在下周发布的下一个维护版本(v1.0.1)中修复。
你的里程数可能会有所不同,但这是我想要尝试的——基本上,它只是为了帮助减少围绕单个演员的争论和开销。