我有一个包含无穷无尽while
循环的异步方法。此循环由程序的主线程频繁暂停和恢复。为此,我目前使用的是Nito.AsyncEx软件包中的PauseTokenSource
类:
private readonly PauseTokenSource _pausation = new();
private async Task StartWorker()
{
while (true)
{
await _pausation.Token.WaitWhilePausedAsync();
DoSomething();
}
}
这工作得很好,但我注意到每次暂停和恢复PauseTokenSource
时都会分配大约 100 个字节。由于这种情况每秒发生多次,因此我正在寻找一种消除此开销的方法。我的想法是用自制的pausation机制替换PauseTokenSource
,该机制具有返回ValueTask
而不是Task
的WaitWhilePausedAsync
方法。为了使实现无需分配,ValueTask
应由实现IValueTaskSource
接口的东西支持。这是我目前(失败)实现此机制的尝试:
public class Pausation : IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _source;
private bool _paused;
public Pausation() => _source.RunContinuationsAsynchronously = true;
public void Pause()
{
_paused = true;
_source.Reset();
}
public void Resume()
{
_paused = false;
_source.SetResult(default);
}
public ValueTask WaitWhilePausedAsync()
{
if (!_paused) return ValueTask.CompletedTask;
return new ValueTask(this, _source.Version);
}
void IValueTaskSource.GetResult(short token)
{
_source.GetResult(token);
}
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
return _source.GetStatus(token);
}
void IValueTaskSource.OnCompleted(Action<object> continuation, object state,
short token, ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
}
我的Pausation
类基于ManualResetValueTaskSourceCore<T>
结构,它应该简化最常见的IValueTaskSource
实现。显然我做错了什么,因为当我的工作await
WaitWhilePausedAsync
方法时,它会崩溃并显示InvalidOperationException
.
我的问题是:如何修复Pausation
类,使其正常工作?我是否应该在_paused
字段之外添加更多状态?我是否在错误的地方调用了ManualResetValueTaskSourceCore<T>
方法?我要求详细的修复说明,或者要求完整且有效的实施。
细节:Pausation
类旨在用于单工作器 - 单控制器方案。只有一个异步工作线程(StartWorker
方法),只有一个控制器线程发出Pause
和Resume
命令。此外,也不需要取消支持。工作人员的终止由CancellationTokenSource
独立处理(为简洁起见,从上面的代码片段中删除)。唯一需要的功能是Pause
、Resume
和WaitWhilePausedAsync
方法。唯一的要求是它正常工作,并且不分配内存。
可在此处找到我的工作器 - 控制器方案的可运行在线演示。
使用Nito.AsyncEx.PauseTokenSource
类输出:
Controller loops: 112,748
Worker loops: 84, paused: 36 times
我的Pausation
类输出:
Controller loops: 117,397
Unhandled exception. System.InvalidOperationException: Operation is not valid due to the current state of the object.
at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.GetStatus(Int16 token)
at Program.Pausation.System.Threading.Tasks.Sources.IValueTaskSource.GetStatus(Int16 token)
at Program.<>c__DisplayClass0_0.<<Main>g__StartWorker|0>d.MoveNext()
我相信你误解了Reset
.ManualResetValueTaskSourceCore<T>.Reset
完全不像ManualResetEvent.Reset
.对于ManualResetValueTaskSourceCore<T>
,Reset
表示"以前的操作已经完成,现在我想重用值任务,所以更改版本"。因此,它应该在GetResult
之后调用(即,在暂停的代码完成等待之后),而不是在Pause
内。封装此 IMO 的最简单方法是在返回ValueTask
之前立即Reset
状态。
同样,除非已返回值任务,否则代码不应调用SetResult
。ManualResetValueTaskSourceCore<T>
实际上是围绕顺序操作非常严格地设计的,并且拥有一个单独的"控制器"会使事情复杂化。您可以通过跟踪使用者是否正在等待,并且仅在有等待时才尝试完成来使其工作:
public class Pausation : IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _source;
private readonly object _mutex = new();
private bool _paused;
private bool _waiting;
public Pausation() => _source.RunContinuationsAsynchronously = true;
public void Pause()
{
lock (_mutex)
_paused = true;
}
public void Resume()
{
var wasWaiting = false;
lock (_mutex)
{
wasWaiting = _waiting;
_paused = _waiting = false;
}
if (wasWaiting)
_source.SetResult(default);
}
public ValueTask WaitWhilePausedAsync()
{
lock (_mutex)
{
if (!_paused) return ValueTask.CompletedTask;
_waiting = true;
_source.Reset();
return new ValueTask(this, _source.Version);
}
}
void IValueTaskSource.GetResult(short token)
{
_source.GetResult(token);
}
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
return _source.GetStatus(token);
}
void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
}
我使用互斥锁修复了一些明显的竞争条件,但我不能保证没有更微妙的竞争条件。至少,它肯定会仅限于单个控制器和单个消费者。
您可能误解了ValueTask
的目的。斯蒂芬·图布(Stephen Toub)分解了为什么在这里引入它:https://devblogs.microsoft.com/dotnet/understanding-whys-whats-and-when's-of-valuetask/
但最相关的信息是,如果异步方法同步且成功完成,ValueTask
不会分配。
如果它确实需要异步完成,则由于后台的状态机,无需分配Task
类。因此,这可能完全需要另一种解决方案。