我有两个.net Task对象,我可能希望并行或按顺序运行。无论哪种情况,我都不想阻塞线程来等待它们。事实证明,反应式扩展使平行故事变得简单而美丽。但是,当我尝试按顺序排列任务时,代码会起作用,但只是感觉很尴尬。
我想知道是否有人能展示如何使顺序版本更简洁,或者像并行版本一样轻松地编码。对于这个问题的答案,没有必要使用反应式扩展。
为了参考,这里是我针对并行和顺序处理的两个解决方案。
并行处理版本
这是纯粹的快乐:
public Task<string> DoWorkInParallel()
{
var result = new TaskCompletionSource<string>();
Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
Task<bool> BravoTask = Task.Factory.StartNew(() => true);
//Prepare for Rx, and set filters to allow 'Zip' to terminate early
//in some cases.
IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);
Observable
.Zip(
AsyncAlpha,
AsyncBravo,
(x, y) => y.ToString() + x.ToString())
.Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
(x) => { result.TrySetResult(x); },
(x) => { result.TrySetException(x.GetBaseException()); },
() => { result.TrySetResult("Nothing"); });
return result.Task;
}
顺序/管道处理版本
这是有效的,但只是笨拙:
public Task<string> DoWorkInSequence()
{
var result = new TaskCompletionSource<string>();
Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
AlphaTask.ContinueWith(x =>
{
if (x.IsFaulted)
{
result.TrySetException(x.Exception.GetBaseException());
}
else
{
if (x.Result != 5)
{
Task<bool> BravoTask = Task.Factory.StartNew(() => true);
BravoTask.ContinueWith(y =>
{
if (y.IsFaulted)
{
result.TrySetException(y.Exception.GetBaseException());
}
else
{
if (y.Result)
{
result.TrySetResult(x.Result.ToString() + y.Result.ToString());
}
else
{
result.TrySetResult("Nothing");
}
}
});
}
else
{
result.TrySetResult("Nothing");
}
}
}
);
return result.Task;
}
在上面的顺序代码中,它变得一团糟,我甚至没有添加与并行版本匹配的超时功能!
要求(更新日期:8/6)
对于那些回答的人,请注意:
顺序场景应该允许第一个任务的输出馈送第二个任务的输入的安排。我上面的示例"笨拙"代码本可以很容易地实现这一点。
我对.net 4.5的答案感兴趣,但.net 4.0的答案对我来说同样或更重要。
"阿尔法"one_answers"布拉沃"任务的总完成时间限制为200ms;它们每个没有200毫秒。在顺序情况下也是如此。
如果任一任务返回无效结果,则SourceCompletionTask必须在两个任务完成之前提前完成。无效结果为[AlphaTask:5]或[BravoTask:false],如示例代码中的显式测试所示
更新8/8:澄清-在顺序情况下,如果AlphaTask不成功或已经发生超时,则BravoTask根本不应执行。假设AlphaTask和BravoTask都不能阻止。这并不重要,但在我的真实场景中,它们实际上是异步WCF服务调用。
也许我可以利用Rx的某个方面来清理顺序版本。但我想,即使只是任务编程本身也应该有一个更好的故事。我们拭目以待。
ERRATA在这两个代码示例中,我都将返回类型更改为Task,因为海报上的答案非常正确,我不应该返回TaskCompletionSource。
如果你可以使用async/await,Brandon有一个很好的答案。如果你还在使用VS2010,我要做的第一件事就是清理顺序版本,那就是获得一个扩展方法,比如Stephen Toub在博客文章中描述的Then
方法。如果您不使用.NET4.5,我还会实现一个Task.FromResult
方法。有了这些,你可以得到:
public Task<string> DoWorkInSequence()
{
return Task.FromResult(4)
.Then(x =>
{ if (x != 5)
{
return Task.FromResult(true)
.Then(y =>
{ if (y)
{
return Task.FromResult(x.ToString() + y.ToString());
}
else
{
return Task.FromResult("Nothing");
}
});
}
else
{
return Task.FromResult("Nothing");
}
});
}
此外,您通常应该返回Task而不是TaskCompletionSource(您可以通过在TaskComplettionSource上调用.Task
来获得它),因为您不希望调用者设置您要返回给他们的任务的结果。
Brandon的回答还提供了一种实现超时功能的好方法(针对缺少async/await关键字进行调整)。
编辑为了减少箭头代码,我们可以实现更多的LINQ方法。SelectMany实现在之前链接的博客文章中提供。我们对LINQ需要的其他方法是Select和Where。一旦你完成了Then和SelectMany,这些应该相当简单,但现在它们是:
public static Task<T> Where<T>(this Task<T> task, Func<T, bool> predicate)
{
if (task == null) throw new ArgumentNullException("task");
if (predicate == null) throw new ArgumentNullException("predicate");
var tcs = new TaskCompletionSource<T>();
task.ContinueWith((completed) =>
{
if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled) tcs.TrySetCanceled();
else
{
try
{
if (predicate(completed.Result))
tcs.TrySetResult(completed.Result);
else
tcs.TrySetCanceled();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
});
return tcs.Task;
}
public static Task<TResult> Select<T, TResult>(this Task<T> task, Func<T, TResult> selector)
{
if (task == null) throw new ArgumentNullException("task");
if (selector == null) throw new ArgumentNullException("selector");
var tcs = new TaskCompletionSource<TResult>();
task.ContinueWith((completed) =>
{
if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled) tcs.TrySetCanceled();
else
{
try
{
tcs.TrySetResult(selector(completed.Result));
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
});
return tcs.Task;
}
之后,最后一个非LINQ扩展方法允许在取消时返回默认值:
public static Task<T> IfCanceled<T>(this Task<T> task, T defaultValue)
{
if (task == null) throw new ArgumentNullException("task");
var tcs = new TaskCompletionSource<T>();
task.ContinueWith((completed) =>
{
if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled) tcs.TrySetResult(defaultValue);
else tcs.TrySetResult(completed.Result);
});
return tcs.Task;
}
以及新的和改进的DoWork(无超时):
public static Task<string> DoWorkInSequence()
{
return (from x in Task_FromResult(5)
where x != 5
from y in Task_FromResult(true)
where y
select x.ToString() + y.ToString()
).IfCanceled("Nothing");
}
Brandon答案中的Timeout方法(如果需要,在没有async/await的情况下重写)可以在整个超时期间卡在链的末端,和/或在链中的每个步骤之后,如果你想在达到整个超时后阻止进一步的步骤运行。链中断的另一种可能性是使所有单独的步骤都使用取消令牌,并修改Timeout方法以使用CancellationTokenSource,并在发生超时时取消它,以及抛出超时异常。
EDIT(Brent Arias)
从你的演讲中获得了一些绝妙的想法,我设计了我认为是POV的最终答案。它基于ParallelExtensionsExtras的nuget包中的.net 4.0扩展方法。下面的示例添加了第三个任务,以帮助说明顺序任务编程的"感觉",给出了我所说的要求:
public Task<string> DoWorkInSequence()
{
var cts = new CancellationTokenSource();
Task timer = Task.Factory.StartNewDelayed(200, () => { cts.Cancel(); });
Task<int> AlphaTask = Task.Factory
.StartNew(() => 4 )
.Where(x => x != 5 && !cts.IsCancellationRequested);
Task<bool> BravoTask = AlphaTask
.Then(x => true)
.Where(x => x && !cts.IsCancellationRequested);
Task<int> DeltaTask = BravoTask
.Then(x => 7)
.Where(x => x != 8);
Task<string> final = Task.Factory
.WhenAny(DeltaTask, timer)
.ContinueWith(x => !DeltaTask.IsCanceled && DeltaTask.Status == TaskStatus.RanToCompletion
? AlphaTask.Result.ToString() + BravoTask.Result.ToString() + DeltaTask.Result.ToString(): "Nothing");
//This is here just for experimentation. Placing it at different points
//above will have varying effects on what tasks were cancelled at a given point in time.
cts.Cancel();
return final;
}
在这次讨论和共同努力中,我提出了一些关键的意见:
- 在琐碎的情况下使用"Then"扩展是不错的,但值得注意的是,它的适用性有限。对于更复杂的情况,有必要将其替换为例如
.ContinueWith(x => true, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default)
。对于我所说的场景,将"Then"替换为"ContinueWith"时,添加OnlyOnRanToCompletion
选项至关重要 - 在我的场景中,使用Timeout扩展最终是行不通的。这是因为它只会导致取消它立即附加到的Task,而不是取消序列中的所有先行Task实例。这就是为什么我切换到
StartNewDelayed(...)
策略,并在每个Where
子句中添加了一个明确的取消检查 - 尽管ParallelExtensionsExtras库中定义了您使用的LINQ to Tasks,但我得出的结论是,最好不要在Tasks中出现LINQ风格的内容。这是因为使用LINQ的任务非常深奥;这可能会让普通开发人员感到困惑。让他们理解异步编码已经够难的了。就连LINQ to Tasks的作者也说:"这个LINQ实现在实践中有多有用是有争议的,但至少它提供了一个有趣的思考练习。"。当然,我必须至少承认"Where"LINQ to Tasks方法,因为它在我上面列出的解决方案中发挥了关键作用
public Task<string> DoWorkInSequence()
{
Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
Func<int> BravoFunc = x => 2 * x;
//Prepare for Rx, and set filters to allow 'Zip' to terminate early
//in some cases.
IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
return AsyncAlpha
.Do(x => Console.WriteLine(x)) //This is how you "Do WORK in sequence"
.Select(BravoFunc) //This is how you map results from Alpha
//via a second method.
.Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
(x) => { result.TrySetResult(x); },
(x) => { result.TrySetException(x.GetBaseException()); },
() => { result.TrySetResult("Nothing"); }).ToTask();
}
然而,如果您想要Tasks,或者使用Observable.ToTask(this IObservable<T> observable)
而不是使用TaskCompletionSource
首先,我不会返回TaskCompletionSource
。这是达到目的的手段。。。应该对公共API隐藏的方法的实现细节。您的方法应该返回一个Task
(它应该只返回result.Task
)。
无论如何,如果你只是在处理任务,你应该只使用TPL,而不是使用Rx。只有当您确实需要将任务与其他Rx代码集成时,才使用Rx。如果你不混合Rx的东西,即使你的DoWorkInParallel
也可以变得简单得多。Rx在处理复杂的任务时表现出色。但您所描述的场景相对简单,可以通过TPL简单地解决。
以下是如何在TPL:中同时执行并行和顺序版本
/// <summary>Extension methods for timing out tasks</summary>
public static class TaskExtensions
{
/// <summary> throws an error if task does not complete before the timer.</summary>
public static async Task Timeout(this Task t, Task timer)
{
var any = await Task.WhenAny(t, timer);
if (any != t)
{
throw new TimeoutException("task timed out");
}
}
/// <summary> throws an error if task does not complete before the timer.</summary>
public static async Task<T> Timeout<T>(this Task<T> t, Task timer)
{
await Timeout((Task)t, timer);
return t.Result;
}
/// <summary> throws an error if task does not complete in time.</summary>
public static Task Timeout(this Task t, TimeSpan delay)
{
return t.IsCompleted ? t : Timeout(t, Task.Delay(delay));
}
/// <summary> throws an error if task does not complete in time.</summary>
public static Task<T> Timeout<T>(this Task<T> t, TimeSpan delay)
{
return Timeout((Task)t, delay);
}
}
// .. elsewhere ..
public async Task<string> DoWorkInParallel()
{
var timer = Task.Delay(TimeSpan.FromMilliseconds(200));
var alphaTask = Task.Run(() => 4);
var betaTask = Task.Run(() => true);
// wait for one of the tasks to complete
var t = await Task.WhenAny(alphaTask, betaTask).Timeout(timer);
// exit early if the task produced an invalid result
if ((t == alphaTask && alphaTask.Result != 5) ||
(t == betaTask && !betaTask.Result)) return "Nothing";
// wait for the other task to complete
// could also just write: await Task.WhenAll(alphaTask, betaTask).Timeout(timer);
await ((t == alphaTask) ? (Task)betaTask : (Task)alphaTask).Timeout(timer);
// unfortunately need to repeat the validation logic here.
// this logic could be moved to a helper method that is just called in both places.
var alpha = alphaTask.Result;
var beta = betaTask.Result;
return (alpha != 5 && beta) ? (alpha.ToString() + beta.ToString()) : "Nothing";
}
public async Task<string> DoWorkInSequence()
{
var timer = Task.Delay(TimeSpan.FromMilliseconds(200));
var alpha = await Task.Run(() => 4).Timeout(timer);
if (alpha != 5)
{
var beta = await Task.Run(() => true).Timeout(timer);
if (beta)
{
return alpha.ToString() + beta.ToString();
}
}
return "Nothing";
}
如果您需要在.Net 4.0中完成工作,则可以使用Microsoft.Bcl.Async nuget包,该包允许您使用VS2012编译器以.Net 4.0为目标,并且仍然使用Async/await。看到这个SO问题:在.net 4 上使用异步等待
编辑:如果任务产生无效值,我已经修改了并行和顺序版本的代码,使其提前退出,并且我还修改了将超时合并而不是按任务。尽管在顺序的情况下,此计时器也将计算两个任务之间的时间。
Aron几乎在上
public Task<string> DoWorkSequentially()
{
Task<int> AlphaTask = Task.Run(() => 4); //Some work;
Task<bool> BravoTask = Task.Run(() => true);//Some other work;
//Prepare for Rx, and set filters to allow 'Zip' to terminate early
//in some cases.
IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);
return (from alpha in AsyncAlpha
from bravo in AsyncBravo
select bravo.ToString() + alpha.ToString())
.Timeout(TimeSpan.FromMilliseconds(200))
.Concat(Observable.Return("Nothing")) //Return Nothing if no result
.Take(1)
.ToTask();
}
在这里,我刚刚将BravoFunc
放回BravoTask
。我已经移除了TaskCompletionSource
(就像Aron一样)。最后,使用ToTask()
操作符将Rx延续返回为Task<string>
。
注意
from alpha in AsyncAlpha
from bravo in AsyncBravo
select bravo.ToString() + alpha.ToString()
也可以写成
AsyncAlpha.SelectMany(a=>AsyncBravo.Select(b=> b.ToString() + a.ToString()))
SelectMany运算符对于这些类型的continuation非常方便。在查询理解语法中,它甚至更方便,因为您仍然可以访问最终选择子句中的bravo
和alpha
。
正如您所看到的,一旦您有了许多延续,这将变得非常有用。例如,考虑一个需要3或4个连续的例子
from a in Alpha
from b in Bravo
from c in Charlie
from d in Delta
select a+b+c+d
这在现实世界中也有一些应用。我认为这是一种常见的模式。一些例子包括:;等待连接服务器,然后获取会话令牌以传递给服务客户端。
from isConnected in _server.ConnectionState.Where(c=>c)
from session in _server.GetSession()
from customer in _customerServiceClient.GetCustomers(session)
select customer;
或者在社交媒体订阅源中,我们需要进行身份验证,找到联系人,获取他们的电子邮件列表,然后下拉这些电子邮件的前20个标题。
from accessToken in _oauth.Authenticate()
from contact in _contactServiceClient.GetContact(emailAddress, accessToken)
from imapMessageId in _mailServiceClient.Search(contact).Take(20)
from email in _mailServiceClient.GetEmailHeaders(imapMessageId)
select email;