使用有界通道的生产者使用者<T>,具有严格的内存分配要求



这是我的场景¹。我有一个生产者使用者系统,该系统由两个生产者、一个使用者和一个配置容量为 2 的有界Channel<T>组成。Tbyte[].

Channel<byte[]> channel = Channel.CreateBounded<byte[]>(2);

通过通道传播的byte[]很大(每个 1GB),这就需要将任何给定时刻存在的阵列总数限制在最低限度。所以两位制片人在创建new byte[1_000_000_000]之前等待,直到他们知道频道中有空位。这是第一个生产者:

Task producer1 = Task.Run(async () =>
{
while (true)
{
await channel.Writer.WaitToWriteAsync();
// The channel has space available. Let's create the array.
byte[] array = new byte[1_000_000_000];
// Here initialize the array (mainly I/O bound, time consuming)
// At this moment the channel might be full,
// because the other producer filled the gap.
await channel.Writer.WriteAsync(array);
}
});

第二个生产者是相同的。不幸的是,这允许两个生产者开始创建一个新阵列,即使通道中只有一个空插槽。因此,在某个时刻,系统可能同时有 4 个巨大的阵列处于活动状态:1 个由消费者使用,1 个存储在通道中,2 个同时创建两个生产者(试图填充一个空插槽)。

我想将托管内存中的数组总数限制为 3。有什么方法可以驯服我的制作人,这样他们就不会开始创建新byte[],直到频道中肯定有可用的空间?换句话说,在创建数组后,生产者应该能够立即将其写入通道中,如下所示:

bool success = channel.Writer.TryWrite(array);

。并且success应始终true.

¹此场景是人为的。它的灵感来自最近的GitHub问题。

>澄清:字节数组的构造和初始化是生产者的专有责任,应该保持这种状态。将建筑工程部分或全部委托给其他地方是不可取的。

一种选择,虽然需要一些手动管理,但它是围绕限制大小的通道的简单包装器。

public class LockingFixedPool<T>
{
private readonly Channel<T> Items;
private readonly SemaphoreSlim locker;

public LockingFixedPool(T[] seeded)
{
Items = Channel.CreateBounded<T>(seeded.Length);
foreach (var element in seeded)
{
Items.Writer.TryWrite(element);
}
}
public async Task<T> GetItemAsync()
{
return await Items.Reader.ReadAsync();
}
public void Release(T item)
{
//unless you make this specific to arrays,
//you should clear before calling release.
Items.Writer.TryWrite(item);
}
}

这种"裸"实现的最大问题是,您必须手动管理何时发布项目。 作为替代方案,您可以返回一个"包装"实例:

public class LockPooledItem<T> : IDisposable
{
public T Item {get;}
private LockingFixedPool<T> _pool;
public LockPooledItem(T item, LockingFixedPool<T> pool)
{
Item = item;
_pool = pool;
}
public void Dispose()
{
_pool.Release(Item);
}
}

并让 LockingFixedPool 返回这些实例。然后,您将分配跟踪对象,但这是围绕管理分配的组合和手动代码之间的权衡。

按评论编辑:

//This is for the sake of simplicity in example
SingletonHolder.LockedGibiByteArrayPool = new LockingArrayPool<byte[]>(new [] { new byte[1_000_000_000], new byte[1_000_000_000] });
Task producer1 = Task.Run(async () =>
{
while (true)
{
await channel.Writer.WaitToWriteAsync();
// Grab an array from our pool.
byte[] array = await 
SingletonHolder.LockedGibiByteArrayPool.GetItemAsync();
// Here initialize the array (mainly I/O bound, time consuming)
// The channel should not be full,
// But the reader -must- make sure to release the array
// when it is done.
// alternatively, use the 'LockPooledItem` pattern suggested,
// and then at least it's just a `Dispose()` call...
await channel.Writer.WriteAsync(array);
}
});

字节数组的构造和初始化是生产者的专属责任,它应该保持这种状态。将建筑工程部分或全部委托给其他地方是不可取的。

在这种情况下,您可以使用限制器来限制"令牌",如果您愿意,其中每个令牌都是要分配的授权。

public sealed class TokenAllocator
{
private readonly SemaphoreSlim _mutex;
public TokenAllocator(int maxTokens) =>
_mutex = new(maxTokens);
public async Task<IDisposable> AllocateAsync()
{
await _mutex.WaitAsync();
return Disposable.Create(() => _mutex.Release());
}
}

用法:

var allocator = new TokenAllocator(3);
var channel = Channel.CreateBounded<(IDisposable token, byte[] item)>(2);
var consumer = Task.Run(async () =>
{
await foreach (var (token, item) in channel.Reader.ReadAllAsync())
using (token)
{
... // Do something with `item`
}
});
var producer1 = Task.Run(async () =>
{
while (true)
{
var token = await allocator.AllocateAsync();
try
{
var item = new byte[1_000_000_000];
... // Do something with `item`
}
catch
{
token.Dispose();
throw;
}
await channel.Writer.WriteAsync((token, item));
}
});

缓冲的生产者/消费者系统(如通道和数据流)在最大缓冲区大小方面有点"模糊"(我永远不记得数据流是否计算输出缓冲区中的项目)。正如你所指出的,他们不计算生产者或消费者持有的任何物品。

因此,为了随时限制对象的总数,您将需要自己的分配器。

public sealed class LimitedAllocator<T>
{
private readonly SemaphoreSlim _mutex;
public LimitedAllocator(int maxItems) =>
_mutex = new(maxItems);
public async Task<AllocatedItem> AllocateAsync(Func<T> create)
{
await _mutex.WaitAsync();
return new(this, create());
}
private void Free() => _mutex.Release();
public sealed class AllocatedItem : IDisposable
{
public AllocatedItem(LimitedAllocator<T> allocator, T item)
{
Item = item;
_disposer = Disposable.Create(() => allocator.Free());
}
public T Item { get; }
public void Dispose() => _disposer.Dispose();
private readonly IDisposable _disposer;
}
}

用法:

var allocator = new LimitedAllocator<byte[]>(3);
var channel = Channel.CreateBounded<LimitedAllocator<byte[]>.AllocatedItem>(2);
var consumer = Task.Run(async () =>
{
await foreach (var allocatedItem in channel.Reader.ReadAllAsync())
using (allocatedItem)
{
... // Do something with allocatedItem.Item
}
});
var producer1 = Task.Run(async () =>
{
while (true)
{
var allocatedItem = await allocator.AllocateAsync(() => new byte[1_000_000_000]);
... // Do something with allocatedItem.Item
await channel.Writer.WriteAsync(allocatedItem);
}
});

笔记:

  • 创建者示例假定异常将导致应用失败。如果必须从中恢复异常,则生产者中的...需要一个try/catch,仅在异常的情况下处理分配的项目。
  • 如果LimitedAllocator<T>中的T只是内存(例如,byte[]),则可以考虑使用IMemoryOwner而不是AllocatedItemIMemoryOwner本质上是一次性的与记忆相结合的。
  • 生产者不再等待查看频道中是否有可用空间
  • ;它只是等待查看分配器中是否有可用空间。如果分配器中有可用空间,则该生产者将成为将创建要发送到频道的项目的生产者。
  • 如果您非常不喜欢IDisposable-with-item 的AllocatedItem配对,则可以使用连接的属性。但这倾向于更多的魔力和更难维护的代码。

这是另一种方法,它也使用了一个SemaphoreSlim,就像Stephen Cleary和to11mtm的答案一样。不同之处在于,信号灯的initialCountmaxCount被初始化为通道的确切容量,并且信号量由使用者在从通道中获取byte[]后立即释放,而不是在完全处理时释放:

const int capacity = 2;
Channel<byte[]> channel = Channel.CreateBounded<byte[]>(capacity);
SemaphoreSlim semaphore = new(capacity, capacity);
Task producer1 = Task.Run(async () =>
{
while (true)
{
await semaphore.WaitAsync();
try
{
byte[] array = new byte[1_000_000_000];
// Here initialize the array...
await channel.Writer.WriteAsync(array);
}
catch
{
semaphore.Release();
throw;
}
}
});
Task consumer = Task.Run(async () =>
{
await foreach (byte[] array in channel.Reader.ReadAllAsync())
{
semaphore.Release();
// Here consume the array...
}
});

本质上,SemaphoreSlim成为限制通道容量的防护装置。如果需要,您也可以使用无界通道。

此方法可确保在任何时候分配的最大byte[]数为 3,不包括符合垃圾回收条件的数。

相关内容

  • 没有找到相关文章

最新更新