为排队的基于定时器的HTTP调用创建异步方法



我有一个类,它包含一个请求队列,这些请求将被收集并在最大1秒的时间间隔后通过HTTP调用发送到Web API:

public class AsyncValueTimerIntervalWriter
{
private class ValueRequest
{
public string FullName { get; set; }
public object Value { get; set; }
}
private readonly IValuesClient _valuesClient; // auto generated Swagger HTTP client
private List<ValueRequest> _valueRequests = new List<ValueRequest>();
private object _valuesLock = new object();
private Timer _timer;
public AsyncValueTimerIntervalWriter(IValuesClient valuesClient)
{
_valuesClient = valuesClient;
}
public void Start() 
{
_timer = new Timer(o => WriteValuesToServer(), null, 0, 1000);
}
public void Stop() 
{
_timer?.Dispose();
_timer = null;
}
public void AddWrite(string fullName, object value)
{
lock (_valuesLock)
{
_valueRequests.Add(new ValueRequest { FullName = fullName, Value = value });
}
}
private async void WriteValuesToServer()
{
IList<ValueRequest> values;
lock (_valuesLock)
{
values = _valueRequests.ToArray();
_valueRequests.Clear();
}
if (values.Any())
{
await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request
}
}
}

呼叫者示例:

var asyncWriter = new AsyncValueTimerIntervalWriter(...);
asyncWriter.AddWrite("My.Var.Tree.VarName", 1234);
asyncWriter.AddWrite("My.Var.Tree.AnotherVar", "Test");
// after max 1 sec the values are written to server

我的目标是编写一个异步方法,它还会添加一个要写入的值,并在写入值时返回:

await asyncWriter.WriteAsync("My.Var.Tree.VarName", 1234);
// should continue after written to server

重要信息:我需要处理队列中的请求,因为写入程序可能随时停止,并且不允许释放请求。再次启动写入程序后,需要将添加的请求发送到服务器。

我试着使用手动重置事件,但感觉很奇怪:

...
public Task WriteAsync(string fullName, object value)
{
var resetEvent = new ManualResetEvent(false);
lock (_valuesLock)
{
_valueRequests.Add(
new ValueRequest 
{ 
FullName = fullName, 
Value = value, 
CompletedEvent = resetEvent 
});
}
resetEvent.WaitOne();
return Task.CompletedTask;
}
private async void WriteValuesToServer()
{
IList<ValueRequest> values;
lock (_valuesLock)
{
values = _valueRequests.ToArray();
_valueRequests.Clear();
}
if (values.Any())
{
await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request
foreach (var value as values)
value.CompletedEvent?.Set();
}
}
...

有什么建议吗?

您可以在ValueEntry类中使用TaskCompletionSource将信号从编写器传递给调用方。

private class ValueEntry
{
public string FullName { get; set; }
public object Value { get; set; }
protected readonly TaskCompletionSource _tcs = new TaskCompleteionSource();
public Task AwaitCompletion()
{
return _tcs.Task;
}
public Task MarkComplete()
{
return _tcs.SetResult();
}
}

WriteValuesToServer:的微小更改

public async Task WriteValuesToServer()
{
// snip 
if (values.Any())
{
await _emsClient.SetValuesAsync(values); // Sends HTTP POST request
foreach (var value as values)
await value.MarkComplete();
}
}

现在你的作者很简单:

public Task WriteAsync(string fullName, object value)
{
var request = new ValueRequest { FullName = fullName, Value = value };
lock (_valuesLock)
{
_valueRequests.Add(request)
};
await request.AwaitCompletion();
}

此外,我建议您考虑使用BlockingCollection,它是为处理生产者/消费者队列而设计的,可以让您摆脱大部分lock块。

最新更新