数据流,将工作拆分为小作业,然后再次分组



我需要做这种工作:

  1. 从数据库中获取Page对象
  2. 对于每个页面,获取所有图像并进行处理(IO绑定,例如,上传到CDN)
  3. 如果所有图像都成功处理,则将Page标记为已在数据库中处理

由于我需要控制并行处理页面的数量,我决定使用TPL数据流:

 ____________________________
|         Data pipe          |
|   BufferBlock<Page>        |
|   BoundedCapacity = 1      |
|____________________________|
              |
 ____________________________
|       Process images       |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 8 |
|____________________________|
              |
 ____________________________
|        Save page           |
| ActionBlock<Page>          |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 5 |
|____________________________|

现在我需要"处理图像"来并行处理图像,但我想限制目前在所有并行页面上处理的图像数量。

我可以将TrasnformManyBlock用于"处理图像",但如何将它们收集回"保存页面"块?

         ____________________________
        |         Data pipe          |
        |   BufferBlock<Page>        |
        |   BoundedCapacity = 1      |
        |____________________________|
                      |
     ___________________________________
    |           Load images             |
    | TransformManyBlock<Page, Image[]> |
    | BoundedCapacity = 1               |
    | MaxDegreeOfParallelism = 8        |
    |___________________________________|
      /              |              
   ______________________________________________
 _|____________________________________________  |
|              Process image                   | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1                          | |
| MaxDegreeOfParallelism = 8                   |_|
|______________________________________________|
                    |               /
         How to group images by page ?
                     |
        ____________________________
       |        Save page           |
       | ActionBlock<Page>          |
       | BoundedCapacity = 1        |
       | MaxDegreeOfParallelism = 5 |
       |____________________________|

最重要的是,其中一个图像可能无法继续,我不想保存有失败图像的页面。

您可以通过在给定页面的图像到达时进行记录,然后在所有图像到达时发送页面,将图像分组在一起。为了弄清楚,页面需要知道它包含了多少图像,但我想你们知道。

在代码中,它可能看起来像这样:

public static IPropagatorBlock<TSplit, TMerged>
    CreaterMergerBlock<TSplit, TMerged>(
    Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
    var dictionary = new Dictionary<TMerged, int>();
    return new TransformManyBlock<TSplit, TMerged>(
        split =>
        {
            var merged = getMergedFunc(split);
            int count;
            dictionary.TryGetValue(merged, out count);
            count++;
            if (getSplitCount(merged) == count)
            {
                dictionary.Remove(merged);
                return new[] { merged };
            }
            dictionary[merged] = count;
            return new TMerged[0];
        });
}

用法:

var dataPipe = new BufferBlock<Page>();
var splitter = new TransformManyBlock<Page, ImageWithPage>(
    page => page.LoadImages(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
    image =>
    {
        // process the image here
        return image;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var merger = CreaterMergerBlock(
    (ImageWithPage image) => image.Page, page => page.ImageCount);
var savePage = new ActionBlock<Page>(
    page => /* save the page here */,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);

.NET平台有一个很好的接口,可以表示父子关系,即IGrouping<TKey, TElement>接口。它只是一个也具有Key性质的IEnumerable。密钥可以是任何东西,在这种情况下,可以是需要处理的Page。分组的内容可以是属于每个页面的Image,并且需要上传。这就产生了一个数据流块的想法,它可以处理IGrouping<TKey, TInput>对象,通过独立处理每个TInput,然后按分组对结果进行标记,最后将它们作为IGrouping<TKey, TOutput>对象输出。以下是这个想法的实现:

public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
    CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        Func<TKey, TInput, Task<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();
    var actionBlock = new ActionBlock<Task<Task<TOutput>>>(taskTask =>
    {
        // An exception thrown by the following line would cause buggy behavior.
        // According to the documentation it should never fail.
        taskTask.RunSynchronously();
        return taskTask.Unwrap();
    }, dataflowBlockOptions);
    var completionCTS = new CancellationTokenSource();
    _ = actionBlock.Completion
        .ContinueWith(_ => completionCTS.Cancel(), TaskScheduler.Default);
    var transformBlock = new TransformBlock<IGrouping<TKey, TInput>,
        IGrouping<TKey, TOutput>>(async grouping =>
    {
        if (grouping == null) throw new InvalidOperationException("Null grouping.");
        var tasks = new List<Task<TOutput>>();
        foreach (var item in grouping)
        {
            // Create a cold task that will be either executed by the actionBlock,
            // or will be canceled by the completionCTS. This should eliminate
            // any possibility that an awaited task will remain cold forever.
            var taskTask = new Task<Task<TOutput>>(() => transform(grouping.Key, item),
                completionCTS.Token);
            var accepted = await actionBlock.SendAsync(taskTask);
            if (!accepted)
            {
                // The actionBlock has failed.
                // Skip the rest of the items. Pending tasks should still be awaited.
                tasks.Add(Task.FromCanceled<TOutput>(new CancellationToken(true)));
                break;
            }
            tasks.Add(taskTask.Unwrap());
        }
        TOutput[] results = await Task.WhenAll(tasks);
        return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping
    }, dataflowBlockOptions);
    // Cleanup
    _ = transformBlock.Completion
        .ContinueWith(_ => actionBlock.Complete(), TaskScheduler.Default);
    _ = Task.WhenAll(actionBlock.Completion, transformBlock.Completion)
        .ContinueWith(_ => completionCTS.Dispose(), TaskScheduler.Default);
    return transformBlock;
}
// Overload with synchronous lambda
public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
    CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        Func<TKey, TInput, TOutput> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    return CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        (key, item) => Task.FromResult(transform(key, item)), dataflowBlockOptions);
}

该实现由两个块组成,一个处理分组的TransformBlock和一个处理单个项目的内部ActionBlock。两者都配置有相同的用户提供的选项。TransformBlock将要处理的项目逐一发送给ActionBlock,然后等待结果,最后用以下棘手的行构建输出IGrouping<TKey, TOutput>

return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping

这弥补了目前在.NET平台中没有实现IGrouping接口的公共类的事实。GroupBy+Single组合可以做到这一点,但它有一个限制,即不允许创建空的IGroupings。如果这是一个问题,创建一个实现该接口的类始终是一个选项。实现一个非常简单(这里是一个例子)。

CreateTransformGroupingBlock方法的使用示例:

var processPages = new TransformBlock<Page, IGrouping<Page, Image>>(page =>
{
    Image[] images = GetImagesFromDB(page);
    return images.GroupBy(_ => page).Single(); // Convert to IGrouping
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var uploadImages = CreateTransformGroupingBlock<Page, Image, Image>(async (page, image) =>
{
    await UploadImage(image);
    return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var savePages = new ActionBlock<IGrouping<Page, Image>>(grouping =>
{
    var page = grouping.Key;
    foreach (var image in grouping) SaveImageToDB(image, page);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
processPages.LinkTo(uploadImages);
uploadImages.LinkTo(savePages);

uploadImages变量的类型为TransformBlock<IGrouping<Page, Image>, IGrouping<Page, Image>>。在该示例中,类型TInputTOutput是相同的,因为图像不需要变换。

考虑将"加载图像"one_answers"处理图像"合并为一个TransformBlock块。这样,您就可以将单个页面的图像放在一起。

为了实现并发限制目标,请使用SemaphoreSlim:

SemaphoreSlim processImageDopLimiter = new SemaphoreSlim(8);
//...
var page = ...; //TransformBlock<Page, MyPageAndImageDTO> block input
var images = GetImages(page);
ImageWithPage[] processedImages =
 images
 .AsParallel()
 .Select(i => {
    processImageDopLimiter.WaitOne();
    var result = ProcessImage(i);
    processImageDopLimiter.ReleaseOne();
    return result;
 })
 .ToList();
return new { page, processedImages };

这将导致相当多的线程被阻塞等待。如果您愿意,可以使用异步版本的处理。这与问题无关。

相关内容

最新更新