为什么我必须将一个异步<T>包装到另一个异步工作流中,然后让!它?



我试图理解 F# 中的异步工作流,但我发现了一个我真的不明白的部分。

以下代码工作正常:

let asynWorkflow = async{
let! result = Stream.TryOpenAsync(partition) |> Async.AwaitTask 
return result
} 
let stream = Async.RunSynchronously asynWorkflow
|> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)

我定义了一个异步工作流,其中 TryOpenAsync 返回Task<StreamOpenResult>类型。我用Async.AwaitTask将其转换为Async<StreamOpenResult>。(支线任务:"等待"任务?它不会等待它只是转换它,是吗?我认为这与Task.Wait或await关键字无关)。我用let!"等待"它并归还它。 为了启动工作流,我使用 RunSyncly,它应该启动工作流并返回结果(绑定它)。在结果上,我检查是否找到流。

但现在是我的第一个问题。为什么我必须将 TryOpenAsync 调用包装在另一个异步计算中,然后让!("等待")它? 例如,以下代码不起作用:

let asynWorkflow =  Stream.TryOpenAsync(partition) |> Async.AwaitTask  
let stream = Async.RunSynchronously asynWorkflow
|> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)

我认为AwaitTask使它成为一种Async<T>RunSynchronously应该启动它。然后使用结果。我错过了什么?

我的第二个问题是为什么有任何"Async.Let!"功能可用?也许是因为它不起作用或更好,为什么它不适用于以下代码?

let ``let!`` task = async{
let! result = task |> Async.AwaitTask 
return result
} 
let stream = Async.RunSynchronously ( ``let!`` (Stream.TryOpenAsync(partition))  )
|> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)

我只是插入 TryOpenAsync 作为参数,但它不起作用。通过说不起作用,我的意思是整个 FSI 将挂起。所以它与我的异步/"等待"有关。

---更新:

FSI 中工作代码的结果:

>
Real: 00:00:00.051, CPU: 00:00:00.031, GC gen0: 0, gen1: 0, gen2: 0
val asynWorkflow : Async<StreamOpenResult>
val stream : Stream

FSI 中代码不起作用的结果:

>

而且您不能再在 FSI 中执行任何内容

---更新2

我正在使用Streamstone。下面是 C# 示例:https://github.com/yevhen/Streamstone/blob/master/Source/Example/Scenarios/S04_Write_to_stream.cs

这里是 Stream.TryOpenAsync: https://github.com/yevhen/Streamstone/blob/master/Source/Streamstone/Stream.Api.cs#L192

我无法告诉你为什么第二个示例在不知道Streampartition是什么以及它们如何工作的情况下不起作用。

但是,我想借此机会指出,这两个例子并不严格等同

F#async有点像要做什么的"配方"。当你写async { ... }时,结果的计算只是坐在那里,实际上什么都不做。这更像是声明一个函数,而不是发出命令。只有当你通过调用类似Async.RunSynchronouslyAsync.Start的东西来"启动"它时,它才会真正运行。一个推论是,您可以多次启动相同的异步工作流,并且每次都会是一个新的工作流。与IEnumerable的工作方式非常相似。

另一方面,C#Task更像是对已经在运行的异步计算的"引用"。一旦您调用Stream.TryOpenAsync(partition),计算就会开始,并且在任务实际开始之前不可能获得Task实例。您可以多次await生成的Task,但每次await都会导致重新尝试打开流。只有第一个await会真正等待任务完成,而每个后续只会返回相同的记住结果。

在异步/反应式术语中,F#async是所谓的"冷",而 C#Task称为"热"。

第二个代码块看起来应该对我有用。如果我为StreamStreamOpenResult提供虚拟实现,它确实会运行它。

应尽可能避免使用Async.RunSynchronously,因为它违背了异步的目的。将所有这些代码放在一个更大的async块中,然后您将可以访问StreamOpenResult

async {
let! openResult = Stream.TryOpenAsync(partition) |> Async.AwaitTask  
let stream = if openResult.Found then openResult.Stream else Stream(partition)
return () // now do something with the stream
}

您可能需要在程序的最外边缘放置一个Async.StartAsync.RunSynchronously才能实际运行它,但是如果您有async(或将其转换为Task)并将其传递给其他代码(例如Web框架)会更好,这些代码可以以非阻塞方式调用它。

不是我想用另一个问题来回答你的问题,而是:你为什么要做这样的代码?这可能有助于理解它。为什么不只是:

let asyncWorkflow = async {
let! result = Stream.TryOpenAsync(partition) |> Async.AwaitTask 
if result.Found then return openResult.Stream else return Stream(partition) }

创建异步工作流只是为了立即调用RunSynchronously是没有意义的 - 这类似于在Task上调用.Result- 它只是阻止当前线程,直到工作流返回。

相关内容

  • 没有找到相关文章

最新更新