参与者和异常内部的异步API调用



我知道PipeTo,但有些东西,比如同步等待嵌套延续,似乎与async&等等。

所以,我的第一个问题[1]是:这里有什么"魔力"吗,这样我们就可以同步地等待嵌套任务的延续,而它最终仍然是异步的?

当我们处于async&等待差异,如何处理失败?

让我们创建一个简单的示例:

public static class AsyncOperations
{
    public async static Task<int> CalculateAnswerAsync()
    {
        await Task.Delay(1000).ConfigureAwait(false);
        throw new InvalidOperationException("Testing!");
        //return 42;
    }
    public async static Task<string> ConvertAsync(int number)
    {
        await Task.Delay(600).ConfigureAwait(false);
        return number + " :)";
    }
}

在"正则"中,async&等待方式:

var answer = await AsyncOperations.CalculateAnswerAsync();
var converted = await AsyncOperations.ConvertAsync(answer);

正如您所期望的那样,第一次操作会出现异常。

现在,让我们创建一个将处理这些异步操作的actor。为了便于论证,假设CalculateAnswerAsyncConvertAsync应该一个接一个地用作一个完整操作(例如,如果您只想向流写入一行,则类似于StreamWriter.WriteLineAsyncStreamWriter.FlushAsync)。

public sealed class AsyncTestActor : ReceiveActor
{
    public sealed class Start
    {
    }
    public sealed class OperationResult
    {
        private readonly string message;
        public OperationResult(string message)
        {
            this.message = message;
        }
        public string Message
        {
            get { return message; }
        }
    }
    public AsyncTestActor()
    {
        Receive<Start>(msg =>
               {
                   AsyncOperations.CalculateAnswerAsync()
                     .ContinueWith(result =>
                            {
                                var number = result.Result;
                                var conversionTask = AsyncOperations.ConvertAsync(number);
                                conversionTask.Wait(1500);
                                return new OperationResult(conversionTask.Result);
                            })
                     .PipeTo(Self);
                });
        Receive<OperationResult>(msg => Console.WriteLine("Got " + msg.Message));
    }
}

如果没有例外的话,我仍然可以毫无问题地获得Got 42 :),这让我回到了[1]上面的"神奇"点。此外,示例中提供的AttachedToParentExecuteSynchronously标志是可选的,还是几乎需要它们才能使一切按预期工作?它们似乎对异常处理没有任何影响。。。

现在,如果CalculateAnswerAsync抛出异常,也就是说result.Result抛出AggregateException,那么它几乎被毫无痕迹地吞噬了。

如果有可能的话,我应该怎么做才能让异步操作中的异常像"常规"异常那样崩溃?

TPL中错误处理的乐趣:)

一旦Task开始在自己的线程上运行,它内部发生的一切都已经与调用者异步了,包括错误处理

  1. 当您在演员体内启动第一个Task时,该任务在ThreadPool上独立于演员运行。这意味着你在Task中所做的任何事情都将与你的actor异步,因为它运行在不同的线程上。这就是为什么我在你帖子顶部链接的PipeTo样本中打了一个Task.Wait电话。对演员来说没有什么区别——它只是看起来像一个长期运行的任务
  2. 异常-如果内部任务失败,conversionTask.Result属性将抛出在运行过程中捕获的异常,因此您需要在Task中添加一些错误处理,以确保您的参与者收到出错通知。注意,我在这里就是这么做的:https://github.com/petabridge/akkadotnet-code-samples/blob/master/PipeTo/src/PipeTo.App/Actors/HttpDownloaderActor.cs#L117-如果你把你的例外变成你的演员可以处理的信息:鸟儿开始唱歌,彩虹发光,TPL错误不再是痛苦和痛苦的来源
  3. 至于抛出异常时会发生什么

现在,如果CalculateAnswerAsync抛出异常,这意味着后果结果抛出AggregateException,它几乎被吞噬了没有任何痕迹。

AggregateException将包含内部异常列表——TPL之所以有聚合错误的概念,是因为(a)您有一个任务是聚合中多个任务的延续,即Task.WhenAll,或者(b)您有错误沿着ContinueWith链传播回父级。您还可以调用AggregateException.Flatten()调用,使管理嵌套异常变得更容易。

TPL+Akka.NET的最佳实践

处理TPL中的异常是一件令人讨厌的事情,这是真的,但处理它的最佳方法是在Task中处理try..catch..异常,并将它们变成您的参与者可以处理的消息类。

此外,示例中提供的AttachedToParent和ExecuteSynchronously标志是可选的,还是几乎需要它们才能使一切正常工作?

当您在continuations上有continuations时,这主要是一个问题——PipeTo会自动在自己身上使用这些标志。它对错误处理没有任何影响,但可以确保在与原始Task相同的线程上立即执行延续。

我建议只有在你进行大量嵌套的延续时才使用这些标志-一旦你进行了1次以上的延续,TPL就开始在如何安排任务方面自由了(事实上,OnlyOnCompleted这样的标志在1次以上延续后就不再被接受了。)

只是为了补充Aaron所说的内容。截至昨天,在使用任务调度器时,我们确实支持参与者内部的安全异步等待。

public class AsyncAwaitActor : ReceiveActor
{
    public AsyncAwaitActor()
    {
        Receive<string>(async m =>
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
            Sender.Tell("done");
        });
    }
}
public class AskerActor : ReceiveActor
{
    public AskerActor(ActorRef other)
    {
        Receive<string>(async m =>
        {
            var res = await other.Ask(m);
            Sender.Tell(res);
        });
    }
}
public class ActorAsyncAwaitSpec : AkkaSpec
{
    [Fact]
    public async Task Actors_should_be_able_to_async_await_ask_message_loop()
    {
        var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>()
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Worker");
        //IMPORTANT: you must use the akka.actor.task-dispatcher
        //otherwise async await is not safe
        var asker = Sys.ActorOf(Props.Create(() => new AskerActor(actor))
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Asker");
        var res = await asker.Ask("something");
        Assert.Equal("done", res);
    }
}

这不是我们的默认调度程序,因为它在性能/吞吐量方面确实有价格。如果触发阻塞的任务(例如使用task.Wait()task.Result),也存在死锁的风险因此,PipeTo模式仍然是首选方法,因为它更适用于行动者模型。但是,如果您真的需要进行一些TPL集成,异步等待支持是一个额外的工具。

此功能实际上在封面下使用了PipeTo。它将继续执行每个任务,并将其封装在一条特殊消息中,然后将该消息传递回参与者,并在参与者自己的并发上下文中执行该任务。

最新更新