我需要做这种工作:
- 从数据库中获取Page对象
- 对于每个页面,获取所有图像并进行处理(IO绑定,例如,上传到CDN)
- 如果所有图像都成功处理,则将Page标记为已在数据库中处理
由于我需要控制并行处理页面的数量,我决定使用TPL数据流:
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
____________________________
| Process images |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|____________________________|
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
现在我需要"处理图像"来并行处理图像,但我想限制目前在所有并行页面上处理的图像数量。
我可以将TrasnformManyBlock用于"处理图像",但如何将它们收集回"保存页面"块?
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
___________________________________
| Load images |
| TransformManyBlock<Page, Image[]> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|___________________________________|
/ |
______________________________________________
_|____________________________________________ |
| Process image | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1 | |
| MaxDegreeOfParallelism = 8 |_|
|______________________________________________|
| /
How to group images by page ?
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
最重要的是,其中一个图像可能无法继续,我不想保存有失败图像的页面。
您可以通过在给定页面的图像到达时进行记录,然后在所有图像到达时发送页面,将图像分组在一起。为了弄清楚,页面需要知道它包含了多少图像,但我想你们知道。
在代码中,它可能看起来像这样:
public static IPropagatorBlock<TSplit, TMerged>
CreaterMergerBlock<TSplit, TMerged>(
Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
var dictionary = new Dictionary<TMerged, int>();
return new TransformManyBlock<TSplit, TMerged>(
split =>
{
var merged = getMergedFunc(split);
int count;
dictionary.TryGetValue(merged, out count);
count++;
if (getSplitCount(merged) == count)
{
dictionary.Remove(merged);
return new[] { merged };
}
dictionary[merged] = count;
return new TMerged[0];
});
}
用法:
var dataPipe = new BufferBlock<Page>();
var splitter = new TransformManyBlock<Page, ImageWithPage>(
page => page.LoadImages(),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
image =>
{
// process the image here
return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var merger = CreaterMergerBlock(
(ImageWithPage image) => image.Page, page => page.ImageCount);
var savePage = new ActionBlock<Page>(
page => /* save the page here */,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);
.NET平台有一个很好的接口,可以表示父子关系,即IGrouping<TKey, TElement>
接口。它只是一个也具有Key
性质的IEnumerable
。密钥可以是任何东西,在这种情况下,可以是需要处理的Page
。分组的内容可以是属于每个页面的Image
,并且需要上传。这就产生了一个数据流块的想法,它可以处理IGrouping<TKey, TInput>
对象,通过独立处理每个TInput
,然后按分组对结果进行标记,最后将它们作为IGrouping<TKey, TOutput>
对象输出。以下是这个想法的实现:
public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
CreateTransformGroupingBlock<TKey, TInput, TOutput>(
Func<TKey, TInput, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();
var actionBlock = new ActionBlock<Task<Task<TOutput>>>(taskTask =>
{
// An exception thrown by the following line would cause buggy behavior.
// According to the documentation it should never fail.
taskTask.RunSynchronously();
return taskTask.Unwrap();
}, dataflowBlockOptions);
var completionCTS = new CancellationTokenSource();
_ = actionBlock.Completion
.ContinueWith(_ => completionCTS.Cancel(), TaskScheduler.Default);
var transformBlock = new TransformBlock<IGrouping<TKey, TInput>,
IGrouping<TKey, TOutput>>(async grouping =>
{
if (grouping == null) throw new InvalidOperationException("Null grouping.");
var tasks = new List<Task<TOutput>>();
foreach (var item in grouping)
{
// Create a cold task that will be either executed by the actionBlock,
// or will be canceled by the completionCTS. This should eliminate
// any possibility that an awaited task will remain cold forever.
var taskTask = new Task<Task<TOutput>>(() => transform(grouping.Key, item),
completionCTS.Token);
var accepted = await actionBlock.SendAsync(taskTask);
if (!accepted)
{
// The actionBlock has failed.
// Skip the rest of the items. Pending tasks should still be awaited.
tasks.Add(Task.FromCanceled<TOutput>(new CancellationToken(true)));
break;
}
tasks.Add(taskTask.Unwrap());
}
TOutput[] results = await Task.WhenAll(tasks);
return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping
}, dataflowBlockOptions);
// Cleanup
_ = transformBlock.Completion
.ContinueWith(_ => actionBlock.Complete(), TaskScheduler.Default);
_ = Task.WhenAll(actionBlock.Completion, transformBlock.Completion)
.ContinueWith(_ => completionCTS.Dispose(), TaskScheduler.Default);
return transformBlock;
}
// Overload with synchronous lambda
public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
CreateTransformGroupingBlock<TKey, TInput, TOutput>(
Func<TKey, TInput, TOutput> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
return CreateTransformGroupingBlock<TKey, TInput, TOutput>(
(key, item) => Task.FromResult(transform(key, item)), dataflowBlockOptions);
}
该实现由两个块组成,一个处理分组的TransformBlock
和一个处理单个项目的内部ActionBlock
。两者都配置有相同的用户提供的选项。TransformBlock
将要处理的项目逐一发送给ActionBlock
,然后等待结果,最后用以下棘手的行构建输出IGrouping<TKey, TOutput>
:
return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping
这弥补了目前在.NET平台中没有实现IGrouping
接口的公共类的事实。GroupBy
+Single
组合可以做到这一点,但它有一个限制,即不允许创建空的IGrouping
s。如果这是一个问题,创建一个实现该接口的类始终是一个选项。实现一个非常简单(这里是一个例子)。
CreateTransformGroupingBlock
方法的使用示例:
var processPages = new TransformBlock<Page, IGrouping<Page, Image>>(page =>
{
Image[] images = GetImagesFromDB(page);
return images.GroupBy(_ => page).Single(); // Convert to IGrouping
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var uploadImages = CreateTransformGroupingBlock<Page, Image, Image>(async (page, image) =>
{
await UploadImage(image);
return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var savePages = new ActionBlock<IGrouping<Page, Image>>(grouping =>
{
var page = grouping.Key;
foreach (var image in grouping) SaveImageToDB(image, page);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
processPages.LinkTo(uploadImages);
uploadImages.LinkTo(savePages);
uploadImages
变量的类型为TransformBlock<IGrouping<Page, Image>, IGrouping<Page, Image>>
。在该示例中,类型TInput
和TOutput
是相同的,因为图像不需要变换。
考虑将"加载图像"one_answers"处理图像"合并为一个TransformBlock
块。这样,您就可以将单个页面的图像放在一起。
为了实现并发限制目标,请使用SemaphoreSlim
:
SemaphoreSlim processImageDopLimiter = new SemaphoreSlim(8);
//...
var page = ...; //TransformBlock<Page, MyPageAndImageDTO> block input
var images = GetImages(page);
ImageWithPage[] processedImages =
images
.AsParallel()
.Select(i => {
processImageDopLimiter.WaitOne();
var result = ProcessImage(i);
processImageDopLimiter.ReleaseOne();
return result;
})
.ToList();
return new { page, processedImages };
这将导致相当多的线程被阻塞等待。如果您愿意,可以使用异步版本的处理。这与问题无关。