如何在从平面文件中读取数据后,仅从Nifi Flow File中提取几列



平面文件有以下数据,但没有需要加载到MySQL表中的头。

101,AAA,1000,10
102,BBB,5000,20

我使用GetFile或GetSFTP处理器来读取数据。一旦读取了数据,流文件就会包含上述数据。我只想将第1列、第2列和第4列加载到MySQL表中。MySQL表中的输出如下。

101,AAA,10
102,BBB,20

你能帮助我如何从nifi中的传入流文件中只提取几列并将其加载到MySQL中吗?

这只是一种方法,但还有其他几种方法。此方法使用Records,并在其他方面避免修改基础数据——它只是忽略insert期间不需要的字段。这在与更大的Flow集成时是有益的,其中数据由其他可能需要原始数据的处理器使用,或者您已经在使用Records。

假设您的表有列

id | name | value

你的数据看起来像

101,AAA,1000,10
102,BBB,5000,20

您可以使用Unmatched Field BehaviorUnmatched Column Behavior设置为Ignore Unmatched...PutDatabaseRecord处理器,并添加CSVReader作为Record Reader

CSVReader中,您可以将Schema Access Strategy设置为Use 'Schema Text' Property。然后将Schema Text属性设置为以下内容:

{
"type": "record",
"namespace": "nifi",
"name": "db",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "ignoredField", "type": "string" },
{ "name": "value", "type": "string" }
]
} 

这将使NiFi Record字段与DB表列相匹配,DB表列将匹配字段1、2和4,而忽略字段3(因为它与列名不匹配(。

显然,修改Schema Text模式中的字段名称,使其与DB表的列名相匹配。您也可以在这里进行数据类型检查/转换。

PutDatabaseRecord

CSVReader

另一种方法可以是在ConvertRecord的帮助下使用将流文件转换为记录。它有助于转换为CSV格式,无论你喜欢什么,你仍然可以保持CSV格式。

但是,随着流文件成为记录,您现在可以使用其他处理器,如:QueryRecord,这样您就可以在流文件上运行类似SQL的命令:

"从FLOWFILE选择*";

在您的情况下,您可以执行:

"从FLOWFILE中选择col1、col2、col3";

您也可以直接应用过滤:

"从FLOWFILE中选择col1、col2、col3,其中col1>500〃;

我建议您阅读以下内容:

  • 查询记录教程

非常感谢pdeuxa和Sdars的回复。你的意见很有帮助。我试着用和你们两个类似的方法。我使用了convertRecord并配置了CSVRecordReaderCSVSetRecordWriterCSVRecordReader具有以下模式来读取数据

{
"type": "record",
"namespace": "nifi",
"name": "db",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "Salary", "type": "string" },
{ "name": "dept", "type": "string" }
]
} 

而CCD_ 17具有以下输出模式。输入模式中有4个字段,而输出模式只有3列。

{
"type": "record",
"namespace": "nifi",
"name": "db",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "dept", "type": "string" }
]
} 

我成功地运行了这个。谢谢你们的意见。

最新更新