我有以下代码:
var tasks = await taskSeedSource
.Select(taskSeed => GetPendingOrRunningTask(taskSeed, createTask, onFailed, onSuccess, sem))
.ToList()
.ToTask();
if (tasks.Count == 0)
{
return;
}
if (tasks.Contains(null))
{
tasks = tasks.Where(t => t != null).ToArray();
if (tasks.Count == 0)
{
return;
}
}
await Task.WhenAll(tasks);
其中taskSeedSource
是反应式可观察量。可能是这段代码有很多问题,但我至少看到两个:
- 我
- 正在收集任务,而没有它我也可以。
- 不知何故,返回的任务列表可能包含空值,即使
GetPendingOrRunningTask
是一个async
方法,因此永远不会返回null
。我不明白为什么会发生这种情况,所以我不得不在不了解问题原因的情况下防御它。
我想使用 AsyncEx 框架中的AsyncCountdownEvent
,而不是收集任务然后等待它们。
因此,我可以将倒计时事件传递给GetPendingOrRunningTask
这将立即递增它,并在等待其内部逻辑完成后返回之前发出信号。但是,我不明白如何将倒计时事件集成到 monad 中(这是反应式行话,不是吗?
正确的方法是什么?
编辑
伙计们,让我们忘记返回列表中神秘的空值。假设一切都是绿色的,代码是
var tasks = await taskSeedSource
.Select(taskSeed => GetPendingOrRunningTask(taskSeed, ...))
.ToList()
.ToTask();
await Task.WhenAll(tasks);
现在的问题是我该如何通过倒计时事件做到这一点?所以,假设我有:
var c = new AsyncCountdownEvent(1);
和
async Task GetPendingOrRunningTask<T>(AsyncCountdownEvent c, T taskSeed, ...)
{
c.AddCount();
try
{
await ....
}
catch (Exception exc)
{
// The exception is handled
}
c.Signal();
}
我的问题是我不再需要返回的任务。这些任务收集并等待获得所有工作项结束的时刻,但现在倒计时事件可用于指示工作何时结束。
我的问题是我不确定如何将其集成到反应式链中。从本质上讲,GetPendingOrRunningTask
可以async void
。在这里,我被困住了。
编辑 2
任务列表中空条目出现奇怪外观
@Servy是正确的,您需要从源头解决空Task
问题。没有人愿意回答关于如何解决一个问题,这个问题违反了你自己定义的方法的契约,但还没有提供检查的来源。
至于关于收集任务的问题,如果你的方法返回泛型Task<T>
,很容易通过Merge
来避免:
await taskSeedSource
.Select(taskSeed => GetPendingOrRunningTask(taskSeed, createTask, onFailed, onSuccess, sem))
.Where(task => task != null) // According to you, this shouldn't be necessary.
.Merge();
但是,不幸的是,非泛型Task
没有官方的Merge
重载,但这很容易定义:
public static IObservable<Unit> Merge(this IObservable<Task> sources)
{
return sources.Select(async source =>
{
await source.ConfigureAwait(false);
return Unit.Default;
})
.Merge();
}