如何从跳过列表中并行批处理



我有一个订单列表,我试图一次处理2000个订单。我想做的是同时处理2000个订单批次中的每一个,一旦全部完成,就退货。在下面的示例代码中,我将获取订单列表,并在完成时将批次发送给CreateOrders,将其添加到已完成的订单中,因为我需要将它们全部返回。在这种情况下,我如何并行处理这批2000个?

public List<Order> BatchOrders(List<Order> orders)
{
var completedOrders = new List<Order>();
int batchSize = 2000;
//Create orders in batch
for (int i = 0; i < orders.Count(); i += batchSize )
{
var batchOrders = orders.Skip(i).Take(batchSize).ToList();
completedOrders.AddRange(CreateOrders(batchOrders));
}
return completedOrders;
}

尝试这样的辅助方法:

public static class BatchExtensions
{
public static IEnumerable<List<T>> Batch<T>(this IEnumerable<T> col, int batchSize = 2000)
{
var batch = new List<T>(batchSize);
foreach (var o in col)
{
batch.Add(o);
if (batch.Count == batchSize)
{
var rc = batch;
batch = new List<T>(batchSize);
yield return rc;
}
}
if (batch.Count > 0)
{
yield return batch;
}
}
}

然后并行处理每个批次,类似于:

public List<Order> BatchOrders(List<Order> orders)
{
var completedOrderBatches = new ConcurrentBag<List<Order>>();
var opts = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(orders.Batch(),opts,  
batch =>
{
completedOrderBatches.Add(CreateOrders(batch));
});
return completedOrderBatches.SelectMany(c => c).ToList();
}

如果您需要CreateOrders中的批号,那么您可以使用Parallel.For而不是Paralle.ForEach。就像使用for而不是foreach一样。

public List<Order> BatchOrders(List<Order> orders)
{
var completedOrders = new ConcurrentBag<List<Order>>();
var opts = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
int batchSize = 2000;
var batches = orders.Batch(batchSize).ToList();
Parallel.For(0,batches.Count,opts, 
batchNum =>
{
var batch = batches[batchNum];
var startId = batchNum * batchSize;
completedOrders.Add(CreateOrders(batch, startId));
});
return completedOrders.SelectMany(c => c).ToList();
}

类似的东西?

public List<Order> BatchOrders(List<Order> orders)
{        
int batchSize = 2000;
List <Task<IEnumerable<Order>>> tasks = new List<Task<IEnumerable<Order>>>();
//Create orders in batch
for (int i = 0; i < orders.Count; i += batchSize)
{
var batchOrders = orders.Skip(i).Take(batchSize).ToList();
// Run CreateOrders as a task and store the task
tasks.Add(Task.Run(() => CreateOrders(batchOrders)));
}
var allTasks = tasks.ToArray();
// Wait till all the tasks are complete
Task.WaitAll(allTasks);
var completedOrders = new List<Order>();
//Merge the results
foreach (var task in allTasks)
completedOrders.AddRange(task.Result);
return completedOrders;
}

以下是如何在TPL数据流库的帮助下实现这一点。您将需要两个块,一个BatchBlock用于进行批处理,另一个TransformManyBlock用于处理每个批处理。

List<Order> BatchOrders(List<Order> orders,
CancellationToken cancellationToken = default)
{
var batchBlock = new BatchBlock<Order>(batchSize: 2000);
var tranformBlock = new TransformManyBlock<Order[], Order>(batch =>
{
return CreateOrders(batch.ToList());
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 5,
CancellationToken = cancellationToken
});
batchBlock.LinkTo(tranformBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
foreach (var order in orders)
{
batchBlock.Post(order);
}
batchBlock.Complete();
var results = new List<Order>();
while (tranformBlock.OutputAvailableAsync().Result)
{
while (tranformBlock.TryReceive(out var order))
{
results.Add(order);
}
}
tranformBlock.Completion.GetAwaiter().GetResult();
return results;
}

此库嵌入在.NET Core上。优点是:

  1. 内置支持配置最大并行度
  2. 内置取消支持
  3. 出现异常时快速失效(例如与Task.WaitAll相反(
  4. 可以处理更复杂的场景
  5. 可以相对容易地转换为异步

缺点:

  1. 不太熟悉
  2. 从最终块中检索结果有些麻烦
  3. 未嵌入.NET Framework(需要安装软件包(

我认为已经提供的答案非常棒!

我想分享我的,因为我喜欢声明性和简洁的代码。

请注意,虽然此代码具有更好的可读性,但运行时效率较差。因为源集合的枚举次数与批的枚举次数一样多。

扩展方法在IReadonlyCollection<T>上定义,而不是在IEnumerable<T>上定义,以排除从外部数据源加载数据的延迟求值枚举。

首先,一些扩展助手

public static class BatchExtensions
{
public static IEnumerable<T> TakePart<T>(this IReadOnlyCollection<T> data, int batchNumber, int batchSize) =>
data
.Skip(batchNumber * batchSize)
.Take(batchSize);
public static IEnumerable<IEnumerable<T>> Batch<T>(this IReadOnlyCollection<T> data, int batchSize) =>
Enumerable
.Range(0, data.Count / batchSize)
.Select(index => TakePart(data, index, batchSize));
}

现在,你的方法是相当简短和简洁的

async Task<IEnumerable<CreatedOrder>> BatchOrders(List<Order> orders, int batchSize)
{
var batches = 
orders
.Batch(batchSize)
.Select(batch => Task.Run(() => CreateOrders(batch)))
.ToArray();
var result = (await Task.WhenAll(batches)).SelectMany(x=>x);
return result;
}

最新更新