如何将TPL数据流TranformBlock或ActionBlock放在单独的文件中



我想将TPL数据流用于我的.NET Core应用程序,并遵循了文档中的示例。

我不想把所有的逻辑都放在一个文件中,而是想把每个TransformBlockActionBlock(我还不需要其他的)分离成它们自己的文件。将整数转换为字符串的小型TransformBlock示例

class IntToStringTransformer : TransformBlock<int, string>
{
public IntToStringTransformer() : base(number => number.ToString()) { }
}

以及向控制台写入字符串的小型CCD_ 4示例

class StringWriter : ActionBlock<string>
{
public StringWriter() : base(Console.WriteLine) { }
}

不幸的是,这不会起作用,因为块类是密封的。有没有一种方法可以将这些块组织到它们自己的文件中?

数据流步骤/块/goroutines本质上是功能性的,最好作为工厂函数的模块来组织,而不是单独的类。TPL数据流管道与F#或任何其他语言中的函数调用管道非常相似。事实上,人们可以将其视为PowerShell管道,只是它更容易编写。

不需要创建一个类或实现一个接口来向该管道添加新函数,只需添加它并将输出重定向到下一个函数即可。

TPL数据流块已经提供了构造管道的原语,并且只需要转换函数。这就是为什么它们是密封的,以防止滥用。

组织数据流的自然方式也类似于F#——使用执行每项工作的函数创建库,将它们放入相关函数的模块中。这些函数是无状态的,所以它们可以像扩展方法一样轻松地进入静态库。

例如,可以有一个模块用于执行批量插入或读取数据的数据库相关函数,另一个模块处理到各种文件格式的导出,单独的类用于调用外部web服务,另一种模块用于解析特定的消息格式。

一个真实的例子

在过去的7年里,我为一家在线旅行社(OTA)处理了几个复杂的管道。其中一个呼叫几个GDS(在线旅行社和航空公司之间的中介机构)来检索交易信息——机票问题、退款、取消等。下一步检索机票记录,即详细的机票信息。最后,将记录插入到数据库中。

GDS太大了,无法满足标准,所以他们的";SOAP";web服务甚至不符合SOAP,更不用说遵循WS-*标准了。因此,每个GDS都需要一个单独的类库来调用服务并解析输出。目前还没有数据流,该项目已经足够复杂了

将数据写入数据库几乎总是一样的,因此有一个单独的项目,其方法采用IEnumerable<T>并使用SqlBulkCopy将其写入数据库。

然而,仅仅加载新数据是不够的,事情经常出错,所以我需要能够加载已经存储的机票信息。

组织

保持理智:

  • 每个管道都有自己的文件:
    • 加载新数据的每日管道
    • 重新加载管道以加载所有存储的数据
    • A";重新运行";使用现有数据的管道再次询问任何丢失的数据
  • 静态类用于分别保存worker函数和工厂方法,这些方法根据配置生成数据流块。例如,CreateLogger(path,level)创建记录特定消息的ActionBlock<Message>
  • 常见的数据流扩展方法-由于dataflow块遵循相同的基本模式,因此很容易通过组合Func<TIn,TOut>和记录器块来创建日志记录块。或者创建一个LinkTo重载,将坏记录重定向到记录器或数据库。这些都很常见,可以成为扩展方法

如果它们在同一个文件中,那么很难在不影响另一个管道的情况下编辑一个管道。此外,除了核心任务之外,管道还有很多其他任务,例如:

  • 日志记录
  • 处理不良记录和部分结果(无法停止10个错误的10万导入)
  • 错误处理(与处理坏记录不同)
  • 监控-这个怪物在最后15分钟里在做什么?DOP=10是否能提高性能

不要创建父管道类

有些步骤是常见的,所以一开始,我创建了一个父类,其中包含重载的常见步骤,或者只是在子类中替换了这些步骤非常糟糕的想法。每个管道都是相似的,但并不完全相同,继承意味着修改一个步骤或一个连接可能会破坏一切。大约一年后,事情变得难以忍受,所以我把家长班分成了不同的班。

正如@Panagiotis所解释的,我认为你必须稍微抛开OOP心态。DataFlow提供的是构建块,您可以对其进行配置以执行所需内容。我将尝试创建一个小例子来说明我的意思:

// Interface and impl. are in separate files. Actually, they could 
// even be in a different project ...
public interface IMyComplicatedTransform
{
Task<string> TransformFunction(int input);
}
public class MyComplicatedTransform : IMyComplicatedTransform
{
public Task<string> IMyComplicatedTransform.TransformFunction(int input)
{
// Some complex logic
}
}
class DataFlowUsingClass{
private readonly IMyComplicatedTransform myTransformer;
private readonly TransformBlock<int , string> myTransform;
// ... some more blocks ...
public DataFlowUsingClass()
{
myTransformer = new MyComplicatedTransform(); // maybe use ctor injection?
CreatePipeline();
}
private void CreatePipeline()
{
// create blocks
myTransform = new TransformBlock<int, string>(myTransformer.TransformFunction);
// ... init some more blocks
// TODO link blocks
}
}

我认为这是最接近你想要做的事情。

最终得到的是一组可以独立测试的接口和实现。客户基本上可以归结为";gluecode";。

编辑:正如@Panagiotis正确指出的那样,接口甚至是超流的。你可以不用。

最新更新