TPL具有内部生命期的数据流块



我一直在做一些TPL数据流编码,我对这些基础知识很满意。我的问题是,我该如何做一个TPL块,除了对它的队列做出反应,它自己也有生命?

比如,一个在永久循环中运行的后台任务,每隔几秒钟轮询一次奇怪的web服务或数据库,并在它认为合适的时候在其输出上发出消息?

接口将是源和目标块,但源消息和目标消息之间没有明显的连接。

基本上是一个"活跃的"块。

block不是后台任务或worker。在管道和信息之外,它们没有自己的生命。但这并不意味着你不能按照自己的要求去做。不过,你必须从外部触发header块。

一种方法是创建一个"ping "的计时器。管道的头块:

var head=new TransformBlock<int,...>(...);
...
var timer=new System.Threading.Timer(_=>head.PostAsync(0),null,0,5000);

您可以根据需要启动、停止或更改定时器。

你可以使用这个例子来周期性地处理文件夹中的文件:

var crawler=new TransformManyBlock<string,string>(root=>{
return Directory.EnumerateFiles(root,"*.csv");
});
var parseCsv=new TransformBlock<string,Record[]>(filePath=>{
var records=await parseCsvAsync(filePath);
return records;
}
...
var timer=new System.Threading.Timer(_=>head.PostAsync(rootFolder),null,0,5000);

所以,这是我最后做的:我的FSM类由一个广播块(作为输入)、一个缓冲块(作为输出)和一个在线程池中运行无限循环的异步方法组成。异步方法在需要的时候从broadcastblock中获取当前输入值,完成它的任务并将结果发布到缓冲块,在我的例子中是每秒一次。

该FSM类的用户可以链接到广播块的输入和缓冲块的输出。异步方法在需要时轮询输入并获取新值。无论它做什么,结果都会进入缓冲区块。

接口看起来像

public interface IFsmBlock : IAsyncDisposable
{
ITargetBlock<string?> SearchParameterInputPort { get; }
ISourceBlock<string> FoundEntriesOutputPort { get; }
}

可能有多个输入或输出块。在这种情况下,FSM基本上将输入端口视为包含值的变量。如果输入端口应该触发某些东西,那么很容易通过使用动作块而不是缓冲块来实现。

最新更新