具有超时的 F# 异步工作流



我真的很喜欢F#的async工作流,但对我来说,它有一个严重的问题:它不允许创建不应超过某个特定时间跨度的工作流。

为了更清楚,这是我为自己编写的一个简单的函数:

let withTimeout operation timeout = async {
try
return Some <| Async.RunSynchronously (operation, timeout)
with :? TimeoutException -> return None
}

即签名是

val withTimeout : operation:Async<'a> -> timeout:int -> Async<'a option>

此处的示例用法:

let op = async { 
do! Async.Sleep(1000) 
return 1
}
#time
withTimeout op 2000 |> Async.RunSynchronously;;
// Real: 00:00:01.116, CPU: 00:00:00.015, GC gen0: 0, gen1: 0, gen2: 0
// val it : unit option = Some 1
withTimeout op 2000 |> Async.RunSynchronously;;
// Real: 00:00:01.004, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
// val it : unit option = Some 1
withTimeout op 500 |> Async.RunSynchronously;;
// Real: 00:00:00.569, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
// val it : unit option = None    

您可以看到,它按预期工作。这非常好,但也有点尴尬,我不确定它的安全性和其他可能出现的问题。也许我正在重新发明轮子,并且有漂亮而简洁的方法来编写这样的工作流程?

UPD:目前最好的选择是由Vesa A.J.K在这里提出的:https://stackoverflow.com/a/26230245/1554463。通过我的编辑,它是这样的:

type Async with
static member WithTimeout (timeout : int option) operation = 
match timeout with
| Some time  when time > 0 -> 
async { 
let! child = Async.StartChild (operation, time) 
try 
let! result = child 
return Some result
with :? TimeoutException -> return None 
}
| _ -> 
async { 
let! result = operation
return Some result
}

这是另一个选项:

type Async with
static member WithCancellation (token:CancellationToken) operation = 
async {
try
let task = Async.StartAsTask (operation, cancellationToken = token)
task.Wait ()
return Some task.Result
with 
| :? TaskCanceledException -> return None
| :? AggregateException -> return None
}
static member WithTimeout (timeout:int option) operation = 
match timeout with
| Some(time) -> 
async {
use tokenSource = new CancellationTokenSource (time)
return! operation |> Async.WithCancellation tokenSource.Token
}
| _ -> 
async { 
let! res = operation
return Some res
}

在这里,我使用 .Net 任务和CancellationToken

只需使用Async.StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>

let with_timeout timeout action =
async {
let! child = Async.StartChild( action, timeout )
return! child
}

请看这个Async.WhenAny的实现,它的行为应该类似于Task.WhenAny

通过使用它,您可以实现withTimeout,类似于此处为Task实现它的方式:

let withTimeout dueTime comp =
let success = async {
let! x = comp
return (Some x)
}
let timeout = async {
do! Async.Delay(dueTime)
return None
}
Async.WhenAny(success, timeout)

我不相信它会在第一个计算完成后取消其他计算,但至少这个实现不会不必要地阻塞线程。

最新更新