如果具有相同ID的对象当前正由另一个任务处理,则延迟对象的任务处理



假设我有对象:

ObjectA(
int ID,
List<ObjectB> items
)
ObjectB(
int ItemId,
string value
)

从RabbitMQ中,我最多获取10条消息prefetchCount = 10,其中在反序列化之后,每条消息都成为ObjectA类型的对象。

一个ObjectA可以具有多个具有相同ItemIdObjectB

所有这些都是在.NET Core 3.1后台工作程序中完成的:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// code omitted for brevity
// register event that handles message received
consumer.Received += MessageReceived;
}
}

在对每个ObjectA进行反序列化之后,我运行两个任务,taskAtaskB,这两个任务目前并不重要。

重要的是TaskA,它处理ObjectA的所有项目-ObjectB的列表。CCD_ 12还为每个CCD_ 13运行多个任务。

如何确保在所有任务taskA或当前接收的所有RMQ消息中,一次只能处理一个具有相同ItemIdObjectB

示例

ObjectA (Id = 1, Items = {(1, "a"), (2, "b"), (1, "c")})

对于给定的对象,我必须开始并行处理项目(1, "a")(2, "b"),而(1, "c")应该等到正在处理(1, "a")的任务完成处理,因为它们的ItemId是相同的。

如果使用容器,则需要注册为Singleton。

在该示例中,为每个密钥创建一个锁,并且当前一个任务正在运行时,新任务将不会启动

public sealed class DelayTaskProcessor
{
private readonly Dictionary<string, object> _syncDict = new Dictionary<string, object>();
private readonly object _sync = new object();
public Task Create(Action action, string id)
{
return new Task(() =>
{
object currentSync = null;
lock (_sync)
{
if (_syncDict.ContainsKey(id))
{
currentSync = _syncDict[id];
}
else
{
currentSync = new object();
_syncDict.Add(id, currentSync);
}
}
lock (currentSync)
{
Console.WriteLine($"Start: {id}");
action?.Invoke();
}
});
}
}
public sealed class ObjectA
{
public int Id { get; set; }
public List<ObjectB> Items { get; set; }
}
public sealed class ObjectB
{
public int Id { get; set; }
public string Value { get; set; }
}
static void Main(string[] args)
{
var p = new DelayTaskProcessor();
var objectA = new ObjectA
{
Id = 1, Items = new List<ObjectB>()
{
new ObjectB {Id=1, Value="a"},
new ObjectB {Id=1, Value="a2"},
new ObjectB {Id=2, Value="b"},
new ObjectB {Id=3, Value="c"}
}
};
var tasks = new List<Task>(objectA.Items.Count);
foreach (var item in objectA.Items)
{
var task = p.Create(() =>
{
Console.WriteLine($"{item.Id},{item.Value}");
Thread.Sleep(2000);
}, $"b:{item.Id}");
tasks.Add(task);
task.Start();
}
Task.WaitAll(tasks.ToArray());
var exceptions = new List<Exception>();
foreach (var t in tasks)
{
if (t.IsFaulted)
{
exceptions.Add(t.Exception);

}
using (t) { }
}
}

我不得不将HERE和HERE的解决方案结合起来,对我有效的代码是:

private static readonly ConcurrentDictionary<object, RefCounted<SemaphoreSlim>> _lockDict = new ConcurrentDictionary<object, RefCounted<SemaphoreSlim>>();
private sealed class RefCounted<T>
{
public RefCounted(T value)
{
RefCount = 1;
Value = value;
}
public int RefCount { get; set; }
public T Value { get; private set; }
}
public DelayTaskProcessor()
{

}
public async Task<bool> ManageConcurrency(object taskId, Func<Task> task)
{
await GetOrCreate(taskId).WaitAsync();
try
{
await task();
return true;
}
catch (Exception ex)
{
return false;
}
finally
{
RefCounted<SemaphoreSlim> item;
lock (_lockDict)
{
item = _lockDict[taskId];
--item.RefCount;
if (item.RefCount == 0)
_lockDict.TryRemove(taskId, out var removed);
}
item.Value.Release();
}
}
private SemaphoreSlim GetOrCreate(object key)
{
RefCounted<SemaphoreSlim> item;
lock (_lockDict)
{
if (_lockDict.TryGetValue(key, out item))
{
++item.RefCount;
}
else
{
item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
_lockDict[key] = item;
}
}
return item.Value;
}
}

请随意评论。

最新更新