我试图理解 F# 中的异步工作流,但我发现了一个我真的不明白的部分。
以下代码工作正常:
let asynWorkflow = async{
let! result = Stream.TryOpenAsync(partition) |> Async.AwaitTask
return result
}
let stream = Async.RunSynchronously asynWorkflow
|> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)
我定义了一个异步工作流,其中 TryOpenAsync 返回Task<StreamOpenResult>
类型。我用Async.AwaitTask将其转换为Async<StreamOpenResult>
。(支线任务:"等待"任务?它不会等待它只是转换它,是吗?我认为这与Task.Wait或await关键字无关)。我用let!
"等待"它并归还它。 为了启动工作流,我使用 RunSyncly,它应该启动工作流并返回结果(绑定它)。在结果上,我检查是否找到流。
但现在是我的第一个问题。为什么我必须将 TryOpenAsync 调用包装在另一个异步计算中,然后让!("等待")它? 例如,以下代码不起作用:
let asynWorkflow = Stream.TryOpenAsync(partition) |> Async.AwaitTask
let stream = Async.RunSynchronously asynWorkflow
|> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)
我认为AwaitTask使它成为一种Async<T>
,RunSynchronously
应该启动它。然后使用结果。我错过了什么?
我的第二个问题是为什么有任何"Async.Let!"功能可用?也许是因为它不起作用或更好,为什么它不适用于以下代码?
let ``let!`` task = async{
let! result = task |> Async.AwaitTask
return result
}
let stream = Async.RunSynchronously ( ``let!`` (Stream.TryOpenAsync(partition)) )
|> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)
我只是插入 TryOpenAsync 作为参数,但它不起作用。通过说不起作用,我的意思是整个 FSI 将挂起。所以它与我的异步/"等待"有关。
---更新:
FSI 中工作代码的结果:
>
Real: 00:00:00.051, CPU: 00:00:00.031, GC gen0: 0, gen1: 0, gen2: 0
val asynWorkflow : Async<StreamOpenResult>
val stream : Stream
FSI 中代码不起作用的结果:
>
而且您不能再在 FSI 中执行任何内容
---更新2
我正在使用Streamstone。下面是 C# 示例:https://github.com/yevhen/Streamstone/blob/master/Source/Example/Scenarios/S04_Write_to_stream.cs
这里是 Stream.TryOpenAsync: https://github.com/yevhen/Streamstone/blob/master/Source/Streamstone/Stream.Api.cs#L192
我无法告诉你为什么第二个示例在不知道Stream
和partition
是什么以及它们如何工作的情况下不起作用。
但是,我想借此机会指出,这两个例子并不严格等同。
F#async
有点像要做什么的"配方"。当你写async { ... }
时,结果的计算只是坐在那里,实际上什么都不做。这更像是声明一个函数,而不是发出命令。只有当你通过调用类似Async.RunSynchronously
或Async.Start
的东西来"启动"它时,它才会真正运行。一个推论是,您可以多次启动相同的异步工作流,并且每次都会是一个新的工作流。与IEnumerable
的工作方式非常相似。
另一方面,C#Task
更像是对已经在运行的异步计算的"引用"。一旦您调用Stream.TryOpenAsync(partition)
,计算就会开始,并且在任务实际开始之前不可能获得Task
实例。您可以多次await
生成的Task
,但每次await
都会导致重新尝试打开流。只有第一个await
会真正等待任务完成,而每个后续只会返回相同的记住结果。
在异步/反应式术语中,F#async
是所谓的"冷",而 C#Task
称为"热"。
第二个代码块看起来应该对我有用。如果我为Stream
和StreamOpenResult
提供虚拟实现,它确实会运行它。
应尽可能避免使用Async.RunSynchronously
,因为它违背了异步的目的。将所有这些代码放在一个更大的async
块中,然后您将可以访问StreamOpenResult
:
async {
let! openResult = Stream.TryOpenAsync(partition) |> Async.AwaitTask
let stream = if openResult.Found then openResult.Stream else Stream(partition)
return () // now do something with the stream
}
您可能需要在程序的最外边缘放置一个Async.Start
或Async.RunSynchronously
才能实际运行它,但是如果您有async
(或将其转换为Task
)并将其传递给其他代码(例如Web框架)会更好,这些代码可以以非阻塞方式调用它。
不是我想用另一个问题来回答你的问题,而是:你为什么要做这样的代码?这可能有助于理解它。为什么不只是:
let asyncWorkflow = async {
let! result = Stream.TryOpenAsync(partition) |> Async.AwaitTask
if result.Found then return openResult.Stream else return Stream(partition) }
创建异步工作流只是为了立即调用RunSynchronously
是没有意义的 - 这类似于在Task
上调用.Result
- 它只是阻止当前线程,直到工作流返回。