超时 使用 TaskCompletionSource 实现的异步方法



我有一个黑盒对象,它公开了一个方法来踢异步操作,并且在操作完成时触发一个事件。 我已经使用TaskCompletionSource将其包装到一个Task<OpResult> BlackBoxOperationAysnc()方法中 - 效果很好。

但是,在该异步包装器中,如果给定超时后未收到事件,我想管理完成异步调用并出现超时错误。 目前我用计时器管理它,如下所示:

public Task<OpResult> BlackBoxOperationAysnc() {
    var tcs = new TaskCompletionSource<TestResult>();   
    const int timeoutMs = 20000;
    Timer timer = new Timer(_ => tcs.TrySetResult(OpResult.Timeout),
                            null, timeoutMs, Timeout.Infinite);
    EventHandler<EndOpEventArgs> eventHandler = (sender, args) => {
        ...
        tcs.TrySetResult(OpResult.BlarBlar);
    }
    blackBox.EndAsyncOpEvent += eventHandler;
    blackBox.StartAsyncOp();
    return tcs.Task;
}

这是管理超时的唯一方法吗? 有没有设置我自己的计时器的方法 - 我看不到 TaskCompletionSource 中内置的任何超时?

您可以使用超时的 CancelTokenSource。像这样与您的TaskCompletionSource一起使用。

例如:

public Task<OpResult> BlackBoxOperationAysnc() {
    var tcs = new TaskCompletionSource<TestResult>();
    const int timeoutMs = 20000;
    var ct = new CancellationTokenSource(timeoutMs);
    ct.Token.Register(() => tcs.TrySetCanceled(), useSynchronizationContext: false);
    EventHandler<EndOpEventArgs> eventHandler = (sender, args) => {
        ...
        tcs.TrySetResult(OpResult.BlarBlar);
    }
    blackBox.EndAsyncOpEvent += eventHandler;
    blackBox.StartAsyncOp();
    return tcs.Task;
}

更新后,这是一个完整的功能示例:

using System;
using System.ComponentModel;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
    public class Program
    {
        // .NET 4.5/C# 5.0: convert EAP pattern into TAP pattern with timeout
        public async Task<AsyncCompletedEventArgs> BlackBoxOperationAsync(
            object state,
            CancellationToken token,
            int timeout = Timeout.Infinite)
        {
            var tcs = new TaskCompletionSource<AsyncCompletedEventArgs>();
            using (var cts = CancellationTokenSource.CreateLinkedTokenSource(token))
            {
                // prepare the timeout
                if (timeout != Timeout.Infinite)
                {
                    cts.CancelAfter(timeout);
                }
                // handle completion
                AsyncCompletedEventHandler handler = (sender, args) =>
                {
                    if (args.Cancelled)
                        tcs.TrySetCanceled();
                    else if (args.Error != null)
                        tcs.SetException(args.Error);
                    else
                        tcs.SetResult(args);
                };
                this.BlackBoxOperationCompleted += handler;
                try
                {
                    using (cts.Token.Register(() => tcs.SetCanceled(), useSynchronizationContext: false))
                    {
                        this.StartBlackBoxOperation(null);
                        return await tcs.Task.ConfigureAwait(continueOnCapturedContext: false);
                    }
                }
                finally
                {
                    this.BlackBoxOperationCompleted -= handler;
                }
            }
        }
        // emulate async operation
        AsyncCompletedEventHandler BlackBoxOperationCompleted = delegate { };
        void StartBlackBoxOperation(object state)
        {
            ThreadPool.QueueUserWorkItem(s =>
            {
                Thread.Sleep(1000);
                this.BlackBoxOperationCompleted(this, new AsyncCompletedEventArgs(error: null, cancelled: false, userState: state));
            }, state);
        }
        // test
        static void Main()
        {
            try
            {
                new Program().BlackBoxOperationAsync(null, CancellationToken.None, 1200).Wait();
                Console.WriteLine("Completed.");
                new Program().BlackBoxOperationAsync(null, CancellationToken.None, 900).Wait();
            }
            catch (Exception ex)
            {
                while (ex is AggregateException)
                    ex = ex.InnerException;
                Console.WriteLine(ex.Message);
            }
            Console.ReadLine();
        }
    }
}
可以

在此处找到.NET 4.0/C# 4.0版本,它利用了编译器生成的IEnumerator状态机。

你可以从这里(https://stackoverflow.com/a/22078975/2680660)使用Task扩展,它也使用CancellationTokenSource

稍作修改:

public static async Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task, TimeSpan timeout)
{
    using (var timeoutCancellationTokenSource = new CancellationTokenSource())
    {
        var completedTask = await Task.WhenAny(task, Task.Delay(timeout, timeoutCancellationTokenSource.Token));
        if (completedTask == task)
        {
            timeoutCancellationTokenSource.Cancel();
            return await task;  // Very important in order to propagate exceptions
        }
        else
        {
            throw new TimeoutException($"{nameof(TimeoutAfter)}: The operation has timed out after {timeout:mm\:ss}");
        }
    }
}
public Task<OpResult> BlackBoxOperationAysnc()
{
    var tcs = new TaskCompletionSource<TestResult>();   
    EventHandler<EndOpEventArgs> eventHandler = (sender, args) => {
        ...
        tcs.TrySetResult(OpResult.BlarBlar);
    }
    blackBox.EndAsyncOpEvent += eventHandler;
    blackBox.StartAsyncOp();
    return tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(20));
}

相关内容

  • 没有找到相关文章

最新更新