F#MailboxProcessor的银行帐户kata速度慢



我已经对"古典的";F#MailboxProcessor的银行账户kata是线程安全的。但是,当我尝试将事务并行添加到帐户时,速度非常慢,非常快:10个并行调用有响应(2ms(,20个没有响应(9秒((参见下面的最后一次测试Account can be updated from multiple threads(

由于MailboxProcessor每秒支持3000万条消息(请参阅Burningmonk的文章(,问题从哪里来?

// -- Domain ----
type Message =
| Open of AsyncReplyChannel<bool>
| Close of AsyncReplyChannel<bool>
| Balance of AsyncReplyChannel<decimal option>
| Transaction of decimal * AsyncReplyChannel<bool>
type AccountState = { Opened: bool; Transactions: decimal list }
type Account() =
let agent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (state: AccountState) =
async {
let! message = inbox.Receive()
match message with
| Close channel ->
channel.Reply state.Opened
return! loop { state with Opened = false }
| Open channel ->
printfn $"Opening"
channel.Reply (not state.Opened)
return! loop { state with Opened = true }
| Transaction (tran, channel) ->
printfn $"Adding transaction {tran}, nb = {state.Transactions.Length}"
channel.Reply true
return! loop { state with Transactions = tran :: state.Transactions }
| Balance channel ->
let balance =
if state.Opened then
state.Transactions |> List.sum |> Some
else
None
balance |> channel.Reply
return! loop state
}
loop { Opened = false; Transactions = [] }
)
member _.Open () = agent.PostAndReply(Open)
member _.Close () = agent.PostAndReply(Close)
member _.Balance () = agent.PostAndReply(Balance)
member _.Transaction (transaction: decimal) =
agent.PostAndReply(fun channel -> Transaction (transaction, channel))
// -- API ----
let mkBankAccount = Account
let openAccount (account: Account) =
match account.Open() with
| true -> Some account
| false -> None
let closeAccount (account: Account option) =
account |> Option.bind (fun a ->
match a.Close() with
| true -> Some a
| false -> None)
let updateBalance transaction (account: Account option) =
account |> Option.bind (fun a ->
match a.Transaction(transaction) with
| true -> Some a
| false -> None)
let getBalance (account: Account option) =
account |> Option.bind (fun a -> a.Balance())
// -- Tests ----
let should_equal expected actual =
if expected = actual then
Ok expected
else
Error (expected, actual)
let should_not_equal expected actual =
if expected <> actual then
Ok expected
else
Error (expected, actual)
let ``Returns empty balance after opening`` =
let account = mkBankAccount() |> openAccount
getBalance account |> should_equal (Some 0.0m)
let ``Check basic balance`` =
let account = mkBankAccount() |> openAccount
let openingBalance = account |> getBalance
let updatedBalance =
account
|> updateBalance 10.0m
|> getBalance
openingBalance |> should_equal (Some 0.0m),
updatedBalance |> should_equal (Some 10.0m)
let ``Balance can increment or decrement`` =
let account = mkBankAccount() |> openAccount
let openingBalance = account |> getBalance
let addedBalance =
account
|> updateBalance 10.0m
|> getBalance
let subtractedBalance =
account
|> updateBalance -15.0m
|> getBalance
openingBalance |> should_equal (Some 0.0m),
addedBalance |> should_equal (Some 10.0m),
subtractedBalance |> should_equal (Some -5.0m)
let ``Account can be closed`` =
let account =
mkBankAccount()
|> openAccount
|> closeAccount
getBalance account |> should_equal None,
account |> should_not_equal None
#time
let ``Account can be updated from multiple threads`` =
let account =
mkBankAccount()
|> openAccount
let updateAccountAsync =
async {
account
|> updateBalance 1.0m
|> ignore
}
let nb = 10 // 👈 10 is quick (2ms), 20 is so long (9s)
updateAccountAsync
|> List.replicate nb
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
getBalance account |> should_equal (Some (decimal nb))
#time

您的问题是您的代码没有一直使用Async。

Account类具有方法OpenCloseBalanceTransaction,并且您使用了AsyncReplyChannel,但是您使用PostAndReply发送消息。这意味着:您向MailboxProcessor发送一条消息,其中包含一个要回复的通道。但是,在这一点上,该方法会同步等待完成。

即使使用Async.Parallel和多个线程,也可能意味着许多线程会锁定自己。如果你改变使用PostAndAsyncReply的所有方法,那么问题就消失了。

还有另外两种性能优化可以提高性能,但在您的示例中并不重要。

  1. 调用列表的长度是错误的。要计算列表的长度,必须遍历整个列表。只有你在Transaction中使用此选项可以打印长度,但要考虑事务列表是否变长。你总是要经历整个列表,无论何时添加事务。这将是您的交易列表的O(N(。

  2. 调用(List.sum(也是如此。无论何时调用Balance,都必须计算当前Balance。也作O(N(。

由于您有MailboxProcessor,您也可以计算这两个值,而不是一次又一次地完全重新计算这些值。因此,它们变成O(1(运算。

最重要的是,我会将OpenCloseTransaction消息更改为不返回任何消息,因为在我看来,它们返回任何消息都没有意义。你的例子甚至让我对bool返回的内容感到困惑值甚至是平均值。

Close消息中,在将其设置为false之前,您将返回state.Opened。为什么?

Open消息中,返回否定的state.Opened。你以后怎么用它,只是看起来不对。

如果bool背后有更多的含义,请用它制作一个不同的歧视并集,描述它返回的目的。

你在整个代码中使用了option<Acount>,我删除了它,因为我看不出它有任何用途

无论如何,这里有一个完整的例子,说明我将如何编写没有速度问题的代码。


type Message =
| Open
| Close
| Balance     of AsyncReplyChannel<decimal option>
| Transaction of decimal
type AccountState = {
Opened:             bool
Transactions:       decimal list
TransactionsLength: int
CurrentBalance:     decimal
}
type Account() =
let agent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (state: AccountState) = async {
match! inbox.Receive() with
| Close ->
printfn "Closing"
return! loop { state with Opened = false }
| Open ->
printfn "Opening"
return! loop { state with Opened = true }
| Transaction tran ->
let l = state.TransactionsLength + 1
printfn $"Adding transaction {tran}, nb = {l}"
if state.Opened then
return! loop {
state with
Transactions       = tran :: state.Transactions
TransactionsLength = l
CurrentBalance     = state.CurrentBalance + tran
}
else
return! loop state
| Balance channel ->
if   state.Opened
then channel.Reply (Some state.CurrentBalance)
else channel.Reply  None
return! loop state
}
let defaultAccount = {
Opened             = false
Transactions       = []
TransactionsLength = 0
CurrentBalance     = 0m
}
loop defaultAccount
)
member _.Open        ()          = agent.Post(Open)
member _.Close       ()          = agent.Post(Close)
member _.Balance     ()          = agent.PostAndAsyncReply(Balance)
member _.Transaction transaction = agent.Post(Transaction transaction)
(* Test *)
let should_equal expected actual =
if expected = actual then
Ok expected
else
Error (expected, actual)
(* --- API --- *)
let mkBankAccount = Account
(* Opens the Account *)
let openAccount  (account: Account) =
account.Open ()
(* Closes the Account *)
let closeAccount (account: Account) =
account.Close ()
(* Updates Account *)
let updateBalance transaction (account: Account) =
account.Transaction(transaction)
(* Gets the current Balance *)
let getBalance (account: Account) =
account.Balance ()
#time
let ``Account can be updated from multiple threads`` =
let account = mkBankAccount ()
openAccount account
let updateBalanceAsync = async {
updateBalance 1.0m account
}
let nb = 50
List.replicate nb updateBalanceAsync
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
Async.RunSynchronously (async {
let! balance = getBalance account
printfn "Balance is %A should be (Some %f)" balance (1.0m * decimal nb)
})
#time

最新更新