我有一个使用Parallel.For的内存问题。
是否有可能强制TPL在不同的Parallel.For中始终使用相同的线程索引?即使我在所有不同的并行中使用MaxDegreeOfParallelism = Environment.ProcessorCount
。For, TPL在两个连续的Parallel.For之间并不完全使用相同的线程池。我有记忆问题,因为这个问题。
我不明白为什么如果我的Environment.ProcessorCount = 2
, TPL不在开始时只创建2个线程,并将其用于所有连续的Parallel.For。这是我的目标。
我怎么能做到呢?
不,不可能。
如果你想手动控制线程,TPL不是正确的抽象。使用系统。线程直接。
这是可行的,但您需要使用自定义TaskScheduler
配置并行循环,这很棘手。这个TaskScheduler
实际上是一个自定义线程池,具有固定数量的线程,与内置的ThreadPool
完全无关。启动和终止这些线程的责任将完全取决于您。下面是这样一个池的最小实现:
public class CustomThreadPool : TaskScheduler, IDisposable
{
private readonly BlockingCollection<Task> _queue;
private readonly Thread[] _threads;
public CustomThreadPool(int threadsCount)
{
_queue = new BlockingCollection<Task>();
_threads = Enumerable.Range(0, threadsCount).Select(_ => new Thread(() =>
{
foreach (var task in _queue.GetConsumingEnumerable())
TryExecuteTask(task);
})).ToArray();
Array.ForEach(_threads, t => t.IsBackground = true);
Array.ForEach(_threads, t => t.Start());
}
protected override void QueueTask(Task task) => _queue.Add(task);
protected override bool TryExecuteTaskInline(Task task,
bool taskWasPreviouslyQueued)
{
if (Array.IndexOf(_threads, Thread.CurrentThread) < 0) return false;
return TryExecuteTask(task);
}
public override int MaximumConcurrencyLevel => _threads.Length;
protected override IEnumerable<Task> GetScheduledTasks() => _queue;
public void Dispose()
{
_queue.CompleteAdding();
Array.ForEach(_threads, t => t.Join());
_queue.Dispose();
}
}
你可以像这样使用Parallel.For
循环:
using CustomThreadPool customThreadPool = new(threadsCount: 2);
ParallelOptions options = new()
{
TaskScheduler = customThreadPool,
MaxDegreeOfParallelism = customThreadPool.MaximumConcurrencyLevel,
};
Parallel.For(1, 10, options, i =>
{
Console.WriteLine($"i: {i}, Thread #{Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(100);
});
样本输出:i: 5, Thread #4
i: 1, Thread #5
i: 2, Thread #5
i: 6, Thread #4
i: 3, Thread #5
i: 7, Thread #4
i: 4, Thread #5
i: 8, Thread #4
i: 9, Thread #5
在线演示。
只要你使用相同的CustomThreadPool
实例,你所有的并行循环将在相同的线程#4和#5上运行。