从外部响应BackgroundService中的取消



如果我在。net Core BackgroundService中以调试模式启动以下示例:

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
Task.Run(async () => await Task.Delay(30000, stoppingToken))
.Wait(stoppingToken);
}
}

Ctrl + C取消事件不调用从CancellationTokenSource调用Cancel()StopAsync()方法。

我想我的问题和这篇文章类似。

当我在ExecuteAsync内部使用阻塞方法时,我如何捕获这些取消?

注。在现实世界中,我的ExecuteAsync一直在监视文件系统,直到在我的目标中创建了一个新文件。为了实现这个行为,我使用了filesystemobserver . waitforchanged()方法。

从注释来看,问题似乎与线程关系不大。真正的问题是如何停止FileSystemWatcher。

您不需要一个带有FileSystemWatcher的额外线程,您需要尽可能快地处理它的更改事件。您可以为此使用异步事件处理程序,或者更快地将事件发送到队列或通道进行处理。

要停止FSW,您可以使用CancellationToken。设置EnableRaisingEventsfalse的注册方法:

stoppingToken.Register(()=>watcher.EnableRaisingEvents=false);

事件处理

为了快速处理事件,可以将FileSystemEventArgs值直接发布到队列或通道,并与其他任务一起处理它们。这有两个好处:

  • 文件事件处理尽可能快,所以没有丢失
  • 代码可以等待所有事件完成,也可以取消它们。
var channel=Channel.CreateUnbounded<FileSystemEventArgs>();
stoppingToken.Register(()=>{
watcher.EnableRaisingEvents=false;
channel.Writer.TryComplete();
});
watcher.Changed+=(o,e)=>channel.Writer.WriteAsync(e,stoppingToken);
await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
{
//do something
}

通道可以看作是一个具有异步读写操作的队列。ReadAllAsync方法将消息从队列中取出,直到停止,并将它们作为IAsyncEnumerable返回,这允许使用await foreach轻松地异步处理项。

管道和通道

代码可以重构成这样:

await watcher.AsChannel(stoppingToken)
.ProcessEvents(stoppingToken);

消费者

很容易将订阅者代码提取到一个单独的方法中。这甚至可以是一个扩展方法:

public static async Task ProcessEvents(this ChannelReader<FileSystemEventArgs> reader,CancellationToken stoppingToken)
{
await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
{
//do something
}
}

并命名为:

var channel=Channel.CreateUnbounded<FileSystemEventArgs>();
stoppingToken.Register(()=>{
watcher.EnableRaisingEvents=false;
channel.Writer.TryComplete();
});
watcher.Changed+=(o,e)=>channel.Writer.WriteAsync(e,stoppingToken);
await ProcessEvents(channel,stoppingToken);

这可以工作,因为Channel有对ChannelReaderChannelWriter的隐式强制转换操作符。

ChannelReader支持多个消费者,因此可以使用多个任务来处理事件,例如:

public static async Task ProcessEvents(this ChannelReader<FileSystemEventArgs> reader,int dop,CancellationToken stoppingToken)
{
var tasks=Enumerable.Range(0,dop).Select(()=>{
await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
{
//do something
}
});
await Task.WhenAll(tasks);
}

生产者

也可以将通道创建和发布提取到单独的方法中。毕竟,我们只需要ChannelReader来处理:

public static ChannelReader AsChannel(this FileSystemWatcher watcher, CancellationToken stoppingToken)
{
var channel=Channel.CreateUnbounded<FileSystemEventArgs>();
stoppingToken.Register(()=>{
watcher.EnableRaisingEvents=false;
channel.Writer.TryComplete();
});
watcher.Changed+=(o,e)=>channel.Writer.WriteAsync(e,stoppingToken);
return channel;
}

并将所有内容组合在一个简单的管道中:

await watcher.AsChannel(stoppingToken)
.ProcessEvents(stoppingToken);

目前我的第一个解决方案是:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var blockingTask = Task.Run(async () => await Task.Delay(30000, stoppingToken));
await Task.WhenAny(blockingTask);
}
}

@Panagiotis Kanavos我很感激你的努力,我会回到你详细的帖子,如果我想改变我的"屏蔽";FSW转换为事件驱动的FSW

在生产中,我使用这样的东西:

private void DoServiceWork() 
{ 
// Some Work if new PDF or docx file is available 
// ...
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
int myTimeout = 1000 * 60 * 60; // 1 hour
while (!stoppingToken.IsCancellationRequested)
{
pdfWatchingTask = Task.Run(() => MyFSWLibrary.Watch(directory, "*.pdf", myTimeout, stoppingToken));
docWatchingTask = Task.Run(() => MyFSWLibrary.Watch(directory, "*.docx", myTimeout, stoppingToken));
var finishedTask = await Task.WhenAny(new Task<MyFSWResult>[] { waitPdfTask, waitXmpTask });
if(finishedTask.Result.Success) DoServiceWork();
}
}   

最新更新