如何在不产生内存分配开销的情况下暂停/恢复异步工作线程?



我有一个包含无穷无尽while循环的异步方法。此循环由程序的主线程频繁暂停和恢复。为此,我目前使用的是Nito.AsyncEx软件包中的PauseTokenSource类:

private readonly PauseTokenSource _pausation = new();
private async Task StartWorker()
{
while (true)
{
await _pausation.Token.WaitWhilePausedAsync();
DoSomething();
}
}

这工作得很好,但我注意到每次暂停和恢复PauseTokenSource时都会分配大约 100 个字节。由于这种情况每秒发生多次,因此我正在寻找一种消除此开销的方法。我的想法是用自制的pausation机制替换PauseTokenSource,该机制具有返回ValueTask而不是TaskWaitWhilePausedAsync方法。为了使实现无需分配,ValueTask应由实现IValueTaskSource接口的东西支持。这是我目前(失败)实现此机制的尝试:

public class Pausation : IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _source;
private bool _paused;
public Pausation() => _source.RunContinuationsAsynchronously = true;
public void Pause()
{
_paused = true;
_source.Reset();
}
public void Resume()
{
_paused = false;
_source.SetResult(default);
}
public ValueTask WaitWhilePausedAsync()
{
if (!_paused) return ValueTask.CompletedTask;
return new ValueTask(this, _source.Version);
}
void IValueTaskSource.GetResult(short token)
{
_source.GetResult(token);
}
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
return _source.GetStatus(token);
}
void IValueTaskSource.OnCompleted(Action<object> continuation, object state,
short token, ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
}

我的Pausation类基于ManualResetValueTaskSourceCore<T>结构,它应该简化最常见的IValueTaskSource实现。显然我做错了什么,因为当我的工作awaitWaitWhilePausedAsync方法时,它会崩溃并显示InvalidOperationException.

我的问题是:如何修复Pausation类,使其正常工作?我是否应该在_paused字段之外添加更多状态?我是否在错误的地方调用了ManualResetValueTaskSourceCore<T>方法?我要求详细的修复说明,或者要求完整且有效的实施。

细节:Pausation类旨在用于单工作器 - 单控制器方案。只有一个异步工作线程(StartWorker方法),只有一个控制器线程发出PauseResume命令。此外,也不需要取消支持。工作人员的终止由CancellationTokenSource独立处理(为简洁起见,从上面的代码片段中删除)。唯一需要的功能是PauseResumeWaitWhilePausedAsync方法。唯一的要求是它正常工作,并且不分配内存。

可在此处找到我的工作器 - 控制器方案的可运行在线演示。

使用Nito.AsyncEx.PauseTokenSource类输出:

Controller loops: 112,748
Worker loops: 84, paused: 36 times

我的Pausation类输出:

Controller loops: 117,397
Unhandled exception. System.InvalidOperationException: Operation is not valid due to the current state of the object.
at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.GetStatus(Int16 token)
at Program.Pausation.System.Threading.Tasks.Sources.IValueTaskSource.GetStatus(Int16 token)
at Program.<>c__DisplayClass0_0.<<Main>g__StartWorker|0>d.MoveNext()

我相信你误解了Reset.ManualResetValueTaskSourceCore<T>.Reset完全不像ManualResetEvent.Reset.对于ManualResetValueTaskSourceCore<T>Reset表示"以前的操作已经完成,现在我想重用值任务,所以更改版本"。因此,它应该在GetResult之后调用(即,在暂停的代码完成等待之后),而不是在Pause内。封装此 IMO 的最简单方法是在返回ValueTask之前立即Reset状态。

同样,除非已返回值任务,否则代码不应调用SetResultManualResetValueTaskSourceCore<T>实际上是围绕顺序操作非常严格地设计的,并且拥有一个单独的"控制器"会使事情复杂化。您可以通过跟踪使用者是否正在等待,并且仅在有等待时才尝试完成来使其工作:

public class Pausation : IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _source;
private readonly object _mutex = new();
private bool _paused;
private bool _waiting;
public Pausation() => _source.RunContinuationsAsynchronously = true;
public void Pause()
{
lock (_mutex)
_paused = true;
}
public void Resume()
{
var wasWaiting = false;
lock (_mutex)
{
wasWaiting = _waiting;
_paused = _waiting = false;
}
if (wasWaiting)
_source.SetResult(default);
}
public ValueTask WaitWhilePausedAsync()
{
lock (_mutex)
{
if (!_paused) return ValueTask.CompletedTask;
_waiting = true;
_source.Reset();
return new ValueTask(this, _source.Version);
}
}
void IValueTaskSource.GetResult(short token)
{
_source.GetResult(token);
}
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
return _source.GetStatus(token);
}
void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
}

我使用互斥锁修复了一些明显的竞争条件,但我不能保证没有更微妙的竞争条件。至少,它肯定会仅限于单个控制器和单个消费者。

您可能误解了ValueTask的目的。斯蒂芬·图布(Stephen Toub)分解了为什么在这里引入它:https://devblogs.microsoft.com/dotnet/understanding-whys-whats-and-when's-of-valuetask/

但最相关的信息是,如果异步方法同步且成功完成,ValueTask不会分配。

如果它确实需要异步完成,则由于后台的状态机,无需分配Task类。因此,这可能完全需要另一种解决方案。

相关内容

最新更新