将单个备用请求批处理为批处理备用请求



假设我有一个接受单个获取请求和批处理请求的 api:http://myapiendpoint.com/mysuperitems/1234

http://myapiendpoint.com/mysuperitems/1234,2345,456,5677在我的代码中,我有一种获取单曲的方法:

async Task<mysuperitem> GetSingleItem(int x) {
var endpoint = $"http://myapiendpoint.com/mysuperitems/{x}";
//... calls single request endpoint
}

但我想做的是将单个调用池化为批量调用。

async Task<mysuperitem> GetSingleItem(int x) {
//... pool this request in a queue and retrieve it when batch complete
}
async Task<IEnumerable<mysuperitem> GetMultiItem(IEnumerable<int> ids){
//... gets items and lets single item know it's done
}

我将如何异步批处理调用并通知单个调用完成。想着一份ConcurrentQueueTimer的工作?

似乎Task.WhenAll是你需要的:

async Task<mysuperitem> GetSingleItem(int x)
{
return await ... // calls single request endpoint
}
async Task<IEnumerable<mysuperitem>> GetMultiItem(IEnumerable<int> ids)
{
return await Task.WhenAll(ids.Select(id => GetSingleItem(id)));
}

是的,你可以使用System.Timers和Timer.Interval。 而且我会使用一个普通的字典>使它变得简单并轻松地将id映射到任务,无论如何,您很可能会从该间隔批处理所有请求,因此实际上不需要队列。然后只需将 GetSingleItem 与从计时器调用的 GetMultiItem 同步,如下所示:

private Dictionary<int,Task<mysuperitem>> _batchbuffer;
private object _lock = new object();
Task<mysuperitem> GetSingleItem(int id) {
lock(_lock) {
return _batchbuffer[id] = new Task<mysuperitem>();
}
}
async Task GetMultiItem(){
Dictionary<int,Task<mysuperitem>> temp;
lock(_lock) {
temp = new Dictionary<int,Task<mysuperitem>>(_batchbuffer);
_batchbuffer.Clear()
}
var batchResults = // do batch request for temp.Keys;
foreach(var result in batchResults)
temp[result.id].complete(result);
}

这是用于减少服务器/网络负载的 OFC 批处理,如果您想提高客户端性能,那就不同了。

这是第一次黑客尝试:

using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
using timers = System.Timers;
using System.Threading;
using System.Collections.Concurrent;
namespace MyNamespace
{
class BatchPoolConsumer<TReturn, TArgs>
{
class PoolItem
{
private readonly object _itemWriteLock = new object();
public object ItemWriteLock => _itemWriteLock;
public Task BlockingTask { get; set; }
public TReturn ReturnValue { get; set; }
public Guid BatchId { get; set; }
public bool IsRead { get; set; }
public ManualResetEventSlim Slim { get; set; }
}
private readonly timers.Timer _batchTimer;
private readonly timers.Timer _poolCleanerTimer;
private readonly ConcurrentDictionary<TArgs, PoolItem> _taskPool =
new ConcurrentDictionary<TArgs, PoolItem>();
private readonly Func<IEnumerable<TArgs>, Task<IEnumerable<(TArgs, TReturn)>>> _batchProcessor;
private readonly int _consumerMaxBatchConsumption;
public BatchPoolConsumer(Func<IEnumerable<TArgs>, Task<IEnumerable<(TArgs, TReturn)>>> batchProcessor, TimeSpan interval, int consumerMaxBatchConsumption)
{
_batchProcessor = batchProcessor;
_consumerMaxBatchConsumption = consumerMaxBatchConsumption;
_batchTimer = InitTimer(interval, BatchTimerElapsed);
_poolCleanerTimer = InitTimer(interval, PoolCleanerElapesed);
}
private static timers.Timer InitTimer(TimeSpan interval, Action<object, timers.ElapsedEventArgs> callback)
{
var timer = new timers.Timer(interval.TotalMilliseconds);
timer.Elapsed += (s, e) => callback(s, e);
timer.Start();
return timer;
}
private void PoolCleanerElapesed(object sendedr, timers.ElapsedEventArgs e)
{
var completedKeys = _taskPool
.Where(i => i.Value.IsRead)
.Select(i => i.Key).ToList();
completedKeys.ForEach(k => _taskPool.TryRemove(k, out _));
}
private void BatchTimerElapsed(object sender, timers.ElapsedEventArgs e)
{
_batchTimer.Stop();
var batchId = Guid.NewGuid();
var keys = _taskPool
.Where(i => !i.Value.BlockingTask.IsCompleted && !i.Value.IsRead && i.Value.BatchId == Guid.Empty)
.Take(_consumerMaxBatchConsumption).Select(kvp => kvp.Key);
keys.ToList()
.ForEach(k =>
{
if(_taskPool.TryGetValue(k, out PoolItem item))
{
lock (item.ItemWriteLock)
{
item.BatchId = batchId;
}
}
});
_batchTimer.Start();
if (_taskPool
.Any(pi => pi.Value.BatchId == batchId))
{
Console.WriteLine($"Processing batch {batchId} for {_taskPool.Count(pi => pi.Value.BatchId == batchId)} items");
var results = _batchProcessor(_taskPool
.Where(pi => pi.Value.BatchId == batchId)
.Select(i => i.Key)).Result;
Console.WriteLine($"Completed batch {batchId} for {_taskPool.Count(pi => pi.Value.BatchId == batchId)} items");
results.ToList().ForEach(r =>
{
if(_taskPool.TryGetValue(r.Item1,out PoolItem val))
{
lock (val.ItemWriteLock)
{
val.ReturnValue = r.Item2;
val.Slim.Set();
}
}
});
}
}
public async Task<TReturn> Get(TArgs args)
{
var slim = new ManualResetEventSlim(false);
var task = Task.Run(() =>
{
slim.Wait();
});
var output = new PoolItem
{
BlockingTask = task,
IsRead = false,
Slim = slim
};
_taskPool[args] = output;
await task;
var returnVal = output.ReturnValue;
output.IsRead = true;
return returnVal;
}
}
}

最新更新