如果我在。net Core BackgroundService中以调试模式启动以下示例:
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
Task.Run(async () => await Task.Delay(30000, stoppingToken))
.Wait(stoppingToken);
}
}
Ctrl + C
取消事件不调用从CancellationTokenSource
调用Cancel()
的StopAsync()
方法。
我想我的问题和这篇文章类似。
当我在ExecuteAsync内部使用阻塞方法时,我如何捕获这些取消?
注。在现实世界中,我的ExecuteAsync一直在监视文件系统,直到在我的目标中创建了一个新文件。为了实现这个行为,我使用了filesystemobserver . waitforchanged()方法。
从注释来看,问题似乎与线程关系不大。真正的问题是如何停止FileSystemWatcher。
您不需要一个带有FileSystemWatcher的额外线程,您需要尽可能快地处理它的更改事件。您可以为此使用异步事件处理程序,或者更快地将事件发送到队列或通道进行处理。
要停止FSW,您可以使用CancellationToken。设置EnableRaisingEvents
到false
的注册方法:
stoppingToken.Register(()=>watcher.EnableRaisingEvents=false);
事件处理
为了快速处理事件,可以将FileSystemEventArgs值直接发布到队列或通道,并与其他任务一起处理它们。这有两个好处:
- 文件事件处理尽可能快,所以没有丢失
- 代码可以等待所有事件完成,也可以取消它们。
var channel=Channel.CreateUnbounded<FileSystemEventArgs>();
stoppingToken.Register(()=>{
watcher.EnableRaisingEvents=false;
channel.Writer.TryComplete();
});
watcher.Changed+=(o,e)=>channel.Writer.WriteAsync(e,stoppingToken);
await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
{
//do something
}
通道可以看作是一个具有异步读写操作的队列。ReadAllAsync方法将消息从队列中取出,直到停止,并将它们作为IAsyncEnumerable返回,这允许使用await foreach
轻松地异步处理项。
管道和通道
代码可以重构成这样:
await watcher.AsChannel(stoppingToken)
.ProcessEvents(stoppingToken);
消费者
很容易将订阅者代码提取到一个单独的方法中。这甚至可以是一个扩展方法:
public static async Task ProcessEvents(this ChannelReader<FileSystemEventArgs> reader,CancellationToken stoppingToken)
{
await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
{
//do something
}
}
并命名为:
var channel=Channel.CreateUnbounded<FileSystemEventArgs>();
stoppingToken.Register(()=>{
watcher.EnableRaisingEvents=false;
channel.Writer.TryComplete();
});
watcher.Changed+=(o,e)=>channel.Writer.WriteAsync(e,stoppingToken);
await ProcessEvents(channel,stoppingToken);
这可以工作,因为Channel
有对ChannelReader
和ChannelWriter
的隐式强制转换操作符。
ChannelReader支持多个消费者,因此可以使用多个任务来处理事件,例如:
public static async Task ProcessEvents(this ChannelReader<FileSystemEventArgs> reader,int dop,CancellationToken stoppingToken)
{
var tasks=Enumerable.Range(0,dop).Select(()=>{
await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
{
//do something
}
});
await Task.WhenAll(tasks);
}
生产者
也可以将通道创建和发布提取到单独的方法中。毕竟,我们只需要ChannelReader来处理:
public static ChannelReader AsChannel(this FileSystemWatcher watcher, CancellationToken stoppingToken)
{
var channel=Channel.CreateUnbounded<FileSystemEventArgs>();
stoppingToken.Register(()=>{
watcher.EnableRaisingEvents=false;
channel.Writer.TryComplete();
});
watcher.Changed+=(o,e)=>channel.Writer.WriteAsync(e,stoppingToken);
return channel;
}
并将所有内容组合在一个简单的管道中:
await watcher.AsChannel(stoppingToken)
.ProcessEvents(stoppingToken);
目前我的第一个解决方案是:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var blockingTask = Task.Run(async () => await Task.Delay(30000, stoppingToken));
await Task.WhenAny(blockingTask);
}
}
@Panagiotis Kanavos我很感激你的努力,我会回到你详细的帖子,如果我想改变我的"屏蔽";FSW转换为事件驱动的FSW
在生产中,我使用这样的东西:
private void DoServiceWork()
{
// Some Work if new PDF or docx file is available
// ...
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
int myTimeout = 1000 * 60 * 60; // 1 hour
while (!stoppingToken.IsCancellationRequested)
{
pdfWatchingTask = Task.Run(() => MyFSWLibrary.Watch(directory, "*.pdf", myTimeout, stoppingToken));
docWatchingTask = Task.Run(() => MyFSWLibrary.Watch(directory, "*.docx", myTimeout, stoppingToken));
var finishedTask = await Task.WhenAny(new Task<MyFSWResult>[] { waitPdfTask, waitXmpTask });
if(finishedTask.Result.Success) DoServiceWork();
}
}