我知道PipeTo
,但有些东西,比如同步等待嵌套延续,似乎与async&等等。
所以,我的第一个问题[1]是:这里有什么"魔力"吗,这样我们就可以同步地等待嵌套任务的延续,而它最终仍然是异步的?
当我们处于async&等待差异,如何处理失败?
让我们创建一个简单的示例:
public static class AsyncOperations
{
public async static Task<int> CalculateAnswerAsync()
{
await Task.Delay(1000).ConfigureAwait(false);
throw new InvalidOperationException("Testing!");
//return 42;
}
public async static Task<string> ConvertAsync(int number)
{
await Task.Delay(600).ConfigureAwait(false);
return number + " :)";
}
}
在"正则"中,async&等待方式:
var answer = await AsyncOperations.CalculateAnswerAsync();
var converted = await AsyncOperations.ConvertAsync(answer);
正如您所期望的那样,第一次操作会出现异常。
现在,让我们创建一个将处理这些异步操作的actor。为了便于论证,假设CalculateAnswerAsync
和ConvertAsync
应该一个接一个地用作一个完整操作(例如,如果您只想向流写入一行,则类似于StreamWriter.WriteLineAsync
和StreamWriter.FlushAsync
)。
public sealed class AsyncTestActor : ReceiveActor
{
public sealed class Start
{
}
public sealed class OperationResult
{
private readonly string message;
public OperationResult(string message)
{
this.message = message;
}
public string Message
{
get { return message; }
}
}
public AsyncTestActor()
{
Receive<Start>(msg =>
{
AsyncOperations.CalculateAnswerAsync()
.ContinueWith(result =>
{
var number = result.Result;
var conversionTask = AsyncOperations.ConvertAsync(number);
conversionTask.Wait(1500);
return new OperationResult(conversionTask.Result);
})
.PipeTo(Self);
});
Receive<OperationResult>(msg => Console.WriteLine("Got " + msg.Message));
}
}
如果没有例外的话,我仍然可以毫无问题地获得Got 42 :)
,这让我回到了[1]上面的"神奇"点。此外,示例中提供的AttachedToParent
和ExecuteSynchronously
标志是可选的,还是几乎需要它们才能使一切按预期工作?它们似乎对异常处理没有任何影响。。。
现在,如果CalculateAnswerAsync
抛出异常,也就是说result.Result
抛出AggregateException
,那么它几乎被毫无痕迹地吞噬了。
如果有可能的话,我应该怎么做才能让异步操作中的异常像"常规"异常那样崩溃?
TPL中错误处理的乐趣:)
一旦Task开始在自己的线程上运行,它内部发生的一切都已经与调用者异步了,包括错误处理
- 当您在演员体内启动第一个
Task
时,该任务在ThreadPool
上独立于演员运行。这意味着你在Task
中所做的任何事情都将与你的actor异步,因为它运行在不同的线程上。这就是为什么我在你帖子顶部链接的PipeTo
样本中打了一个Task.Wait
电话。对演员来说没有什么区别——它只是看起来像一个长期运行的任务 - 异常-如果内部任务失败,
conversionTask.Result
属性将抛出在运行过程中捕获的异常,因此您需要在Task
中添加一些错误处理,以确保您的参与者收到出错通知。注意,我在这里就是这么做的:https://github.com/petabridge/akkadotnet-code-samples/blob/master/PipeTo/src/PipeTo.App/Actors/HttpDownloaderActor.cs#L117-如果你把你的例外变成你的演员可以处理的信息:鸟儿开始唱歌,彩虹发光,TPL错误不再是痛苦和痛苦的来源 - 至于抛出异常时会发生什么
现在,如果CalculateAnswerAsync抛出异常,这意味着后果结果抛出AggregateException,它几乎被吞噬了没有任何痕迹。
AggregateException
将包含内部异常列表——TPL之所以有聚合错误的概念,是因为(a)您有一个任务是聚合中多个任务的延续,即Task.WhenAll
,或者(b)您有错误沿着ContinueWith
链传播回父级。您还可以调用AggregateException.Flatten()
调用,使管理嵌套异常变得更容易。
TPL+Akka.NET的最佳实践
处理TPL中的异常是一件令人讨厌的事情,这是真的,但处理它的最佳方法是在Task
中处理try..catch..
异常,并将它们变成您的参与者可以处理的消息类。
此外,示例中提供的AttachedToParent和ExecuteSynchronously标志是可选的,还是几乎需要它们才能使一切正常工作?
当您在continuations上有continuations时,这主要是一个问题——PipeTo
会自动在自己身上使用这些标志。它对错误处理没有任何影响,但可以确保在与原始Task
相同的线程上立即执行延续。
我建议只有在你进行大量嵌套的延续时才使用这些标志-一旦你进行了1次以上的延续,TPL就开始在如何安排任务方面自由了(事实上,OnlyOnCompleted这样的标志在1次以上延续后就不再被接受了。)
只是为了补充Aaron所说的内容。截至昨天,在使用任务调度器时,我们确实支持参与者内部的安全异步等待。
public class AsyncAwaitActor : ReceiveActor
{
public AsyncAwaitActor()
{
Receive<string>(async m =>
{
await Task.Delay(TimeSpan.FromSeconds(1));
Sender.Tell("done");
});
}
}
public class AskerActor : ReceiveActor
{
public AskerActor(ActorRef other)
{
Receive<string>(async m =>
{
var res = await other.Ask(m);
Sender.Tell(res);
});
}
}
public class ActorAsyncAwaitSpec : AkkaSpec
{
[Fact]
public async Task Actors_should_be_able_to_async_await_ask_message_loop()
{
var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>()
.WithDispatcher("akka.actor.task-dispatcher"),
"Worker");
//IMPORTANT: you must use the akka.actor.task-dispatcher
//otherwise async await is not safe
var asker = Sys.ActorOf(Props.Create(() => new AskerActor(actor))
.WithDispatcher("akka.actor.task-dispatcher"),
"Asker");
var res = await asker.Ask("something");
Assert.Equal("done", res);
}
}
这不是我们的默认调度程序,因为它在性能/吞吐量方面确实有价格。如果触发阻塞的任务(例如使用task.Wait()
或task.Result
),也存在死锁的风险因此,PipeTo
模式仍然是首选方法,因为它更适用于行动者模型。但是,如果您真的需要进行一些TPL集成,异步等待支持是一个额外的工具。
此功能实际上在封面下使用了PipeTo
。它将继续执行每个任务,并将其封装在一条特殊消息中,然后将该消息传递回参与者,并在参与者自己的并发上下文中执行该任务。