BufferBlock.ReceiveAsync(超时)挂起,但BufferBlock.RReceiveAsync()工



要么我做错了什么,但下面的消息永远不会返回,尽管指定了1秒的超时,但它永远挂在ReceiveAsync上。

我希望它在超时后返回null值。

/* snipped MyContainer class */
private readonly BufferBlock<byte[]> queue = new BufferBlock<byte[]>();
public async Task ExecuteAsync(CancellationToken stoppingToken)
{
// makes no difference if creating with TaskCreationOptions.LongRunning
await Task
.Factory
.StartNew(async () =>
{
while (stoppingToken.IsCancellationRequested == false)
{                     
// we get here OK, but no further if i use TimeSpan for delay
// without Timespan i.e. ReceiveAsync() only, it does **not** hang
var item = await 
this
.queue
.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
// process it, but we never get here we sleep forever
await ProcessAsync(item);
}
} /*,TaskCreationOptions.LongRunning*/);
// we get here and print the below OK
Console.WriteLine("thread created and running");
}
// this is called by the original (or some other) thread
// either if i call this or not, the above thread code still locks on ReceiveAsync
public void Add(byte[] data)
{
Console.WriteLine("adding");
this.queue.Post(data);
Console.WriteLine("done"); // gets posted OK     
}

重要更新-如果我没有指定延迟,可以正常工作

var item = await this.queue.ReceiveAsync());

如果我消除了延迟,代码就可以正常工作,但我每秒都会做一些后台内务处理(用于数据包计数器等(,所以如果在1秒内没有收到任何消息,这一点很重要。

其他注意事项:

我从一个通用的网络工作者主机调用上面的代码:

public class Worker : BackgroundService
{
private readonly MyContainer containerClass;
private readonly ILogger<Worker> logger;
public Worker(MyContainer containerClass, ILogger<Worker> logger)
{
this.containerClass = containerClass;
this.logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
this.containerClass.ExecuteAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{                
this.logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
await Task.Delay(1000, stoppingToken);
}
}
}

以上是在工人由IHostBuilder构建之后调用的,我称之为Host.Run()

我的理解是(我显然需要处理!(既然我创建了线程,它应该完全独立于创建/调用它的线程运行(而不是阻塞(…换句话说,它应该能够在线程本身内调用ReceiveAsync而不会被阻塞。

使用带有异步委托的Task.Factory.StartNew创建嵌套任务:

Task<Task> nestedTask = Task.Factory.StartNew(async () => { //...

你在等待外在的任务,而不是内在的任务,所以内在的任务变成了即发即弃的任务。使用await操作符两次可以在一行中等待两项任务:

await await Task.Factory.StartNew(async () => { //...

或者,您可以使用Unwrap方法将这两个任务合并为一个任务。

await Task.Factory.StartNew(async () => { /* ... */ }).Unwrap();

或者更好地使用Task.Run方法而不是Task.Factory.StartNew,因为前者理解异步委托,并为您进行展开:

await Task.Run(async () => { //...

如果你对Task.Factory.StartNewTask.Run之间的差异感兴趣,你可以在这里阅读一篇内容丰富的文章。

感谢所有响应的人,最后感谢Enrico(请随意复制/粘贴,我会给你分配答案(,代码实际上运行正常。

正在引发TimeoutException异常,但我的代码或Visual Studio没有捕捉到该异常。

根据启用所有CLR异常https://learn.microsoft.com/en-us/visualstudio/debugger/managing-exceptions-with-the-debugger?view=vs-2019年开始抛出异常。

然后我在代码中处理了异常,并能够按照我的设计要求进行:

public Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task
.Factory
.StartNew(async () => {
while (stoppingToken.IsCancellationRequested == false)
{
try
{
var ts = TimeSpan.FromSeconds(UpdateFrequencySeconds);
var item = await this.queue.ReceiveAsync(ts);
await ProcessAsync(item);
}
catch (TimeoutException)
{
// this is ok, timer expired 
}
catch (Exception e)
{
this.logger.LogError(e.ToString());
}
UpdateCounters();
}
await StopAsync();
},
stoppingToken,
TaskCreationOptions.LongRunning, 
TaskScheduler.Default)
.Unwrap();
}

最新更新