<T> 在任何给定时间运行 X 个任务,同时保持 UI 响应



我有一个C#WinForms(.NET 4.5.2)应用TPL。该工具具有同步函数,该函数将传递到任务工厂X次数(具有不同的输入参数),其中X是用户在开始过程之前声明的数字。任务是启动并存储在List<Task>中的。

假设用户输入5,我们将其在async按钮中单击处理程序:

for (int i = 0; i < X; i++)
{
    var progress = Progress(); // returns a new IProgress<T>
    var task = Task<int>.Factory.StartNew(() => MyFunction(progress), TaskCreationOptions.LongRunning);
    TaskList.Add(task);
}

每个progress实例更新UI。

现在,一旦完成任务,我想启动一个新的。本质上,该过程应无限期地运行,除非用户通过UI取消X任务运行(我将使用取消令牌为此)。我尝试使用以下内容来实现这一目标:

while (TaskList.Count > 0)
{
    var completed = await Task.WhenAny(TaskList.ToArray());                                  
    if (completed.Exception == null)
    {
        // report success
    }
    else
    {
        // flatten AggregateException, print out, etc
    }
    // update some labels/textboxes in the UI, and then:
    TaskList.Remove(completed);
    var task = Task<int>.Factory.StartNew(() => MyFunction(progress), TaskCreationOptions.LongRunning);
    TaskList.Add(task);
}

这陷入UI。是否有更好的方法可以实现此功能,同时保持UI响应良好?

在评论中提出了使用TPL数据流的建议,但是由于时间限制和规格,欢迎替代解决方案

update

我不确定进度报告可能是问题吗?这是它的样子:

private IProgress<string> Progress()
{
    return new Progress<string>(msg =>
    {
        txtMsg.AppendText(msg);
    });
}

现在,一旦完成任务,我想启动一个新的。本质上,该过程应无限期运行,在任何给定时间>

在我看来,您想要一个无限循环您的任务:

for (int i = 0; i < X; i++)
{
  var progress = Progress(); // returns a new IProgress<T>
  var task = RunIndefinitelyAsync(progress);
  TaskList.Add(task);
}
private async Task RunIndefinitelyAsync(IProgress<T> progress)
{
  while (true)
  {
    try
    {
      await Task.Run(() => MyFunction(progress));
      // handle success
    }
    catch (Exception ex)
    {
      // handle exceptions
    }
    // update some labels/textboxes in the UI
  }
}

但是,我怀疑"陷入UI"可能在// handle success和/或// handle exceptions代码中。如果我的怀疑是正确的,请将尽可能多的逻辑推入Task.Run

据我了解,您只需要并行的并行程度的并行执行即可。有很多方法可以实施您想要的东西。我建议使用阻止集合和并行类而不是任务。

因此,当用户单击按钮时,您需要创建一个新的封锁集合,这将是您的数据源:

BlockingCollection<IProgress> queue = new BlockingCollection<IProgress>();
CancellationTokenSource source = new CancellationTokenSource();

现在您需要一个并行执行您的跑步者:

Task.Factory.StartNew(() =>
    Parallel.For(0, X, i =>
    {
        foreach (IProgress p in queue.GetConsumingEnumerable(source.Token))
        {
            MyFunction(p);
        }
    }), source.Token);

,或者您可以使用分区器选择更多正确的方法。因此,您需要一个分区类:

private class BlockingPartitioner<T> : Partitioner<T>
{
    private readonly BlockingCollection<T> _Collection;
    private readonly CancellationToken _Token;
    public BlockingPartitioner(BlockingCollection<T> collection, CancellationToken token)
    {
        _Collection = collection;
        _Token = token;
    }
    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        throw new NotImplementedException();
    }
    public override IEnumerable<T> GetDynamicPartitions()
    {
        return _Collection.GetConsumingEnumerable(_Token);
    }
    public override bool SupportsDynamicPartitions
    {
        get { return true; }
    }
}

和跑步者将看起来像这样:

ParallelOptions Options = new ParallelOptions();
Options.MaxDegreeOfParallelism = X;
Task.Factory.StartNew(
    () => Parallel.ForEach(
        new BlockingPartitioner<IProgress>(queue, source.Token),
        Options,
        p => MyFunction(p)));

因此,您现在需要的只是填充queue所需的数据。您可以随时进行。

和最终触摸,当用户取消操作时,您有两个选择:

  • 首先,您可以使用source.Cancel打破执行,
  • 或者您可以通过标记收集完成(queue.CompleteAdding)来优雅地停止执行,在这种情况下,Runner将执行所有已排队的数据并完成。

当然,您需要其他代码来处理异常,进度,状态等。但是主要想法就在这里。

最新更新