为什么级联ManualResetEvents上的多次等待会使执行时间增加三倍



我已经尽可能地简化了这个场景。在下面你可以找到完整的代码,用它你可以自己测试程序。在我的用例中,我必须等待来自另一个线程的重置事件-让我们调用子事件。事件subEvent会再次发出另一个线程完成的信号。让我们称之为重置事件。在这个简化版本中,我在200ms后发出resetEvent信号,并立即发出

subEvent此示例如预期的那样工作。我的输出是

Sub Thread: 229 ms
Main 229 ms
Thread: 229 ms

30毫秒仍然是一个沉重的代价,但当我增加两个重置事件的等待次数时,情况会变得更糟——这更接近我在程序中的场景。在方法MultipleCascaded中,我创建了5个等待重置事件(在200毫秒后设置(的线程,以及15个等待子事件-(在重置事件之后设置(的螺纹。

Set reset event 641
Main 641 ms
Thread 4: 660 ms
Thread 1: 661 ms
Thread 3: 662 ms
Sub Thread 4.0.: 662 ms
Sub Thread 4.1.: 663 ms
Sub Thread 0.0.: 661 ms
Sub Thread 0.1.: 661 ms
Sub Thread 1.1.: 664 ms
Sub Thread 1.2.: 664 ms
Sub Thread 3.0.: 665 ms
Sub Thread 3.1.: 665 ms
Sub Thread 3.2.: 666 ms
Thread 0: 661 ms
Sub Thread 4.2.: 663 ms

这里有趣的部分是,重置事件的SetWait方法不会产生时间间隔。在这种情况下,计时器似乎无法在指定时间之后执行。

当然,在我的应用程序中,这些都不是定时器,比如I/O访问、计算等等。但效果是一样的。在执行过程中有一个明显的时间间隔,似乎什么都没发生。我在其他场景中也遇到了这个问题(例如,过度使用async/await(。但在那里,我无法在一个小例子中重现它。

我现在的问题是,发生了什么事?更重要的是,我该如何解决这个问题?以下是我发现的问题:

  • 线程池中的所有线程都被阻塞-有很多线程可以同时处于活动状态
  • 定时器——正如我在实际应用程序中提到的,我做I/O访问和计算工作。没有计时器
  • 这个特殊重置事件的实现-当我使用ManualResetEventAutoResetEvent时也会发生这种情况,在我的旧应用程序中,我使用async/await;在这种情况下,它是TaskCompletionSource

现在是测试它的代码。

class Program
{
static void Main(string[] args)
{
SingleCascaded();
MultipleCascaded();
}
private static void MultipleCascaded()
{
Console.WriteLine("Test multiple waits cascaded");
ManualResetEventSlim resetEvent = new ManualResetEventSlim(false);
Stopwatch watch = new Stopwatch();
watch.Start();
for (int j = 0; j < 5; j++)
{
ThreadPool.QueueUserWorkItem(state =>
{
ManualResetEventSlim subEvent = new ManualResetEventSlim(false);
for (int k = 0; k < 3; k++)
{
ThreadPool.QueueUserWorkItem(state =>
{
subEvent.Wait();
Console.WriteLine(
$"Sub Thread {((Tuple<int, int>) state).Item1}.{((Tuple<int, int>) state).Item2}.: {watch.ElapsedMilliseconds} ms");
}, new Tuple<int, int>((int) state, k));
}
resetEvent.Wait();
subEvent.Set();
Console.WriteLine($"Thread {state}: {watch.ElapsedMilliseconds} ms");
}, j);
}
using Timer timer = new Timer(state =>
{
Console.WriteLine($"Set reset event {watch.ElapsedMilliseconds}");
resetEvent.Set();
}, null, 200,
Timeout.Infinite);
}
private static void SingleCascaded()
{
Console.WriteLine("Test single waits cascaded");

ManualResetEventSlim resetEvent = new ManualResetEventSlim(false);
Stopwatch watch = new Stopwatch();
watch.Start();

ThreadPool.QueueUserWorkItem(state =>
{
ManualResetEventSlim subEvent = new ManualResetEventSlim(false);
ThreadPool.QueueUserWorkItem(state =>
{
subEvent.Wait();
Console.WriteLine(
$"Sub Thread: {watch.ElapsedMilliseconds} ms");
});
resetEvent.Wait();
subEvent.Set();
Console.WriteLine($"Thread: {watch.ElapsedMilliseconds} ms");
});
using Timer timer = new Timer(state => { resetEvent.Set(); }, null, 200,
Timeout.Infinite);
resetEvent.Wait();
Console.WriteLine($"Main {watch.ElapsedMilliseconds} ms");
}
}

根据我的评论,这是因为循环使用了ThreadPool中的所有可用线程,然后需要开始使用算法添加更多线程,并且Timer回调(resetevent.set(在ThreadPool线程上运行。此外,即使应用程序中没有计时器,如果I/O在线程池线程上运行,问题也是一样的。

对此的一个解决方案是在循环之前使用将立即可用的按需线程的数量设置为更高的数量

// Get number of immediately available threads
ThreadPool.GetMinThreads(out var a, out var b);
Console.WriteLine($"worker         : " + a); // on my machine: 12
Console.WriteLine($"completion port: " + b); // on my machine: 12
// Set minimum number of immediately available threads.
// First parameter (worked threads) is relevant here:
ThreadPool.SetMinThreads(30, 12);

您可以通过更改这些数字或更改循环中的上限(=所需线程数(来验证这一点。它与Reset事件的类型无关,因为在这两种情况下,工作线程都会阻塞SingleCascaded0。

Thread.Sleep()替换计时器也可以验证这一点。这不会在ThreadPool上运行;相反,它阻塞了主线程:

Thread.Sleep(10000); // Block main thread for 10secs for demo purposes.
resetEvent.Set();

然后你可以看到12个线程立即执行,然后在我的机器上每500-1000秒一个接一个地添加下一个线程。10秒后,设置事件并解除线程阻塞:

START
Thread wait: 13 ms
Thread wait: 13 ms
Thread wait: 13 ms
Thread wait: 13 ms
Thread wait: 13 ms
Sub Thread wait: 16 ms
Sub Thread wait: 16 ms
Sub Thread wait: 17 ms
Sub Thread wait: 19 ms
Sub Thread wait: 21 ms
Sub Thread wait: 23 ms
Sub Thread wait: 25 ms    // 12th thread
Sub Thread wait: 1014 ms  // no more threads available, start adding them one by one
Sub Thread wait: 2003 ms
Sub Thread wait: 2517 ms
Sub Thread wait: 3511 ms
Sub Thread wait: 4518 ms
Sub Thread wait: 5517 ms
Sub Thread wait: 6512 ms
Sub Thread wait: 7507 ms
Set event: 10013 ms               // set: all threads are released
Sub Thread go  : 10013 ms
Thread go  : 10013 ms
Sub Thread go  : 10013 ms
Sub Thread go  : 10013 ms

您也可以将Sleep((设置为200毫秒,而不是10秒。您将看到,在这种情况下,线程将在大约200ms后运行,因为不需要像使用Timer那样在ThreadPool上等待。

MS文档:
SetMinThreads
计时器

最新更新