正在创建超时的任务



我想执行一系列任务,每个任务都有自己的超时时间。

我从这里借用了创建超时任务的扩展方法http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx

所以代码低于

public static Task TimeoutAfter(this Task task, int millisecondsTimeout)
{
// Short-circuit #1: infinite timeout or task already completed
if (task.IsCompleted || (millisecondsTimeout == Timeout.Infinite))
{
// Either the task has already completed or timeout will never occur.
// No proxy necessary.
return task;
}
// tcs.Task will be returned as a proxy to the caller
TaskCompletionSource<VoidTypeStruct> tcs = new TaskCompletionSource<VoidTypeStruct>();
// Short-circuit #2: zero timeout
if (millisecondsTimeout == 0)
{
// We've already timed out.
tcs.SetException(new TimeoutException());
return tcs.Task;
}
// Set up a timer to complete after the specified timeout period
Timer timer = new Timer(state =>
{
// Recover your state information
var myTcs = (TaskCompletionSource<VoidTypeStruct>)state;
// Fault our proxy with a TimeoutException
myTcs.TrySetException(new TimeoutException());
}, tcs, millisecondsTimeout, Timeout.Infinite);
// Wire up the logic for what happens when source task completes
task.ContinueWith(antecedent =>
{
timer.Dispose(); // Cancel the timer
MarshalTaskResults(antecedent, tcs); // Marshal results to proxy
},
CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
return tcs.Task;
}
public class Program
{
private static List<int> Output = new List<int>();
private static Random _random = new Random();
public static void LongRunningTask(string message)
{
Console.WriteLine(message);
Console.WriteLine("Managed thread Id " + Thread.CurrentThread.ManagedThreadId);            
//Simulate a long running task
Thread.Sleep(TimeSpan.FromSeconds(3));
var number = _random.Next();
Console.WriteLine("Adding " + number);
Output.Add(number);
}
public static void Main(string[] args)
{
var tasks = new List<Task>();
var t1 = Task.Factory.StartNew(_ => LongRunningTask("Entering task1"),TaskCreationOptions.AttachedToParent).TimeoutAfter(10);
var t2 = Task.Factory.StartNew(_ => LongRunningTask("Entering task2"),TaskCreationOptions.AttachedToParent);
var t3 = Task.Factory.StartNew(_ => LongRunningTask("Entering task3"),TaskCreationOptions.AttachedToParent);
tasks.Add(t1);
tasks.Add(t2);
tasks.Add(t3);
try
{
Task.WaitAll(tasks.ToArray());
}
catch (Exception ex)
{
Console.WriteLine("There was an exception");
Console.WriteLine(ex.InnerException.Message);
}
Console.WriteLine("Output :");
Output.ForEach(_ => Console.WriteLine(_));
Console.ReadLine();
}
}

the output 
Entering task1
Managed thread Id 10
Entering task2
Managed thread Id 11
Entering task3
Managed thread Id 14
Adding 453738994
Adding 156432981
Adding 1340619865
There was an exception
The operation has timed out.
Output :
453738994
156432981
1340619865

现在,我不明白的是,为什么t1仍然在完成,尽管我已经指定了一个超时,并且发生了超时异常。

我正在使用.net 4。

编辑:

确保超时任务在超时后不执行任何操作,即完全取消任务。

public class Program
{
private static List<int> Output = new List<int>();
private static Random _random = new Random();
public static int LongRunningTask(string message)
{
Console.WriteLine(message);
Console.WriteLine("Managed thread Id " + Thread.CurrentThread.ManagedThreadId);            
//Simulate a long running task
Thread.Sleep(TimeSpan.FromSeconds(2));
var number = _random.Next();
Console.WriteLine("Adding " + number + " From thread  - " + Thread.CurrentThread.ManagedThreadId);
return number;
}
public static void Main(string[] args)
{
Console.WriteLine("In Main");
Console.WriteLine("Managed thread Id " + Thread.CurrentThread.ManagedThreadId);
var cts = new CancellationTokenSource();
var tasks = new List<Task>();
var t1 = Task.Factory.StartNew(_ => LongRunningTask("Entering task1"), TaskCreationOptions.AttachedToParent)
.ContinueWith(_ => Output.Add(_.Result),cts.Token)
.TimeoutAfter(1000);
var t2 = Task.Factory.StartNew(_ => LongRunningTask("Entering task2"), TaskCreationOptions.AttachedToParent)
.ContinueWith(_ => Output.Add(_.Result));
var t3 = Task.Factory.StartNew(_ => LongRunningTask("Entering task3"), TaskCreationOptions.AttachedToParent)
.ContinueWith(_ => Output.Add(_.Result));
tasks.Add(t1);
tasks.Add(t2);
tasks.Add(t3);
try
{
Task.WaitAll(tasks.ToArray());
}
catch (Exception ex)
{
Console.WriteLine("There was an exception");
Console.WriteLine(ex.InnerException.Message);
cts.Cancel();
}
Console.WriteLine("Output :");
Output.ForEach(_ => Console.WriteLine(_));
Console.ReadLine();
}
}

输出:

In Main
Managed thread Id 9
Entering task1
Managed thread Id 10
Entering task2
Managed thread Id 11
Entering task3
Managed thread Id 13
Adding 1141027730 From thread  - 10
Adding 1856518562 From thread  - 13
Adding 1856518562 From thread  - 11
There was an exception
The operation has timed out.
Output :
1141027730
1856518562
1856518562

Output包含三个值,因为程序等待所有任务Task.WaitAll(tasks.ToArray());,Output是公共字段(因为闭包)

你可以只保留第一个任务,你会看到另一个结果

Entering task1
Managed thread Id 10
There was an exception
The operation has timed out.
Output :
Adding 1923041190
Managed thread Id 10

请注意,Adding已呼叫,但Output中没有该号码。调用Adding是因为LongRunningTask在此任务Task.Factory.StartNew(_ => LongRunningTask("Entering task1"), TaskCreationOptions.AttachedToParent)中工作,并且在不同的线程上引发了异常。此异常不会影响LongRunningTask

编辑:

有几种选择:

  1. 调用t1.Wait,异常将立即重新抛出,您可以取消任务
  2. 使用继续之前调用TimeoutAfter(10)

    var t1 = Task.Factory.StartNew(() => LongRunningTask("Entering task1"))
    .TimeoutAfter(10)
    .ContinueWith(_=> Output.Add(_.Result), cts.Token);
    

Continue只有在完成TimeoutAfterLongRunningTask之后才会执行,但您必须更新TimeoutAfter,您必须返回Task<Result>而不是Task

public static Task<Result> TimeoutAfter<Result>(this Task<Result> task, int millisecondsTimeout)
{
// Short-circuit #1: infinite timeout or task already completed
if (task.IsCompleted || (millisecondsTimeout == Timeout.Infinite))
{
Console.WriteLine("task.IsCompleted");
// Either the task has already completed or timeout will never occur.
// No proxy necessary.
return task;
}
// tcs.Task will be returned as a proxy to the caller
var tcs = new TaskCompletionSource<Result>();
// Short-circuit #2: zero timeout
if (millisecondsTimeout == 0)
{
//                Console.WriteLine("millisecondsTimeout == 0");
// We've already timed out.
tcs.SetException(new TimeoutException());
return tcs.Task;
}
// Set up a timer to complete after the specified timeout period
var timer = new Timer(state => tcs.TrySetException(new TimeoutException()), null, millisecondsTimeout, Timeout.Infinite);
// Wire up the logic for what happens when source task completes
task.ContinueWith(antecedent =>
{
timer.Dispose();
MarshalTaskResults(antecedent, tcs); 
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
return tcs.Task;
}

TimeoutAfter()方法不对底层Task执行任何操作。因此,即使发生超时,Task仍将继续执行并最终完成。

如果不修改LongRunningTask(),就没有好的方法来解决这个问题。如果你可以修改LongRunningTask(),那么你应该做的是让它接受CancellationToken,并在适当的时候检查它。

您的ContinueWith()尝试没有改变任何内容,因为Task仍然完成,所以继续激发。

会有所帮助的是:

var t1 = Task.Factory.StartNew(() => LongRunningTask("Entering task1"))
.TimeoutAfter(1000)
.ContinueWith(t => Output.Add(t.Result), cts.Token);

如果您这样做,那么t1将表示延续,因此如果发生超时,它将出现故障(等待它将引发异常)。如果您不希望这样,请在访问Result之前,在continuation中检查t的状态。

此外,您永远不应该像这样在List上调用Add(),因为Add()不是线程安全的,并且有可能多个线程同时尝试向其添加。要避免这种情况,请使用其中一个并发集合或锁定。

仅供参考,我最终做了类似的事情

var t1 = Task.Factory.StartNew(_ => LongRunningTask("Entering task1"),                              TaskCreationOptions.AttachedToParent)
.TimeoutAfter(1000)
.ContinueWith(_ =>
{
if(!(_.IsCanceled || _.IsFaulted))
Output.Add(_.Result);
}
, cts.Token);

尽管我找到了前面的答案,但我发现使用基于事件的框架很容易实现一个实现。

让我解释一下需求,我必须将所有内容都封装在异步中,这可能需要超过50毫秒的时间,这样用户与屏幕的交互才能保持流畅。因此,我对服务器的所有套接字请求和响应都需要封装。这种类型的编程通常包括请求一些东西,然后得到一个答案——请求和答案不需要像人们所问的那样遵循FIFO,黄金的价格是多少,数据每秒流入几百次,然后问我的账户价值是多少。

这是我的实现,我添加了一些注释,使一些人更容易理解。

internal Task<string[]> RequestAccountNamesAsync() => RequestAccountNamesAsync(-1);
internal Task<string[]> RequestAccountNamesAsync(int millisecondsTimeout) => RequestAccountNamesAsync(CancellationToken.None, millisecondsTimeout);
internal Task<string[]> RequestAccountNamesAsync(CancellationToken token,int millisecondsTimeout = 1000 )
{
var t1 =  Task.Factory.StartNew<string[]>( () =>
{
try
{
//the result type of the Task
string[] result = null;
//local helper function used to hookup the event
void Method(object sender, OnAccountsReceivedArgs ac)
{
this.OnAccountsReceived -= Method;
result = ac.Accounts;
}
//event responsible for reacting on the "on complete event"
this.OnAccountsReceived += Method;

//item responsible for initiating the socket request
clientSocket.reqManagedAccts();
//measure time-out 
DateTimeOffset startTime = DateTimeOffset.Now;
//loop waiting for the result to come from the server
while (result == null)
{
if (millisecondsTimeout > 0 && DateTimeOffset.Now.Subtract(startTime).Milliseconds >= millisecondsTimeout)
throw new TimeoutException();
//if the value of the millisecondsTimeout argument is zero, the thread relinquishes the remainder of its 
// time slice to any thread of equal priority that is ready to run
// If there are no other threads of equal priority that are ready to run, execution of the current thread
// is not suspended. 
Thread.Sleep(0);
}
return result;
}
catch (Exception e)
{                    
//integrate my proprietary logging framework
logger.Enqueue<IBClient>(e);
throw e;
}
});
return t1;
}

相关内容

  • 没有找到相关文章