平面文件有以下数据,但没有需要加载到MySQL表中的头。
101,AAA,1000,10
102,BBB,5000,20
我使用GetFile或GetSFTP处理器来读取数据。一旦读取了数据,流文件就会包含上述数据。我只想将第1列、第2列和第4列加载到MySQL表中。MySQL表中的输出如下。
101,AAA,10
102,BBB,20
你能帮助我如何从nifi中的传入流文件中只提取几列并将其加载到MySQL中吗?
这只是一种方法,但还有其他几种方法。此方法使用Records,并在其他方面避免修改基础数据——它只是忽略insert
期间不需要的字段。这在与更大的Flow集成时是有益的,其中数据由其他可能需要原始数据的处理器使用,或者您已经在使用Records。
假设您的表有列
id | name | value
你的数据看起来像
101,AAA,1000,10
102,BBB,5000,20
您可以使用Unmatched Field Behavior
和Unmatched Column Behavior
设置为Ignore Unmatched...
的PutDatabaseRecord
处理器,并添加CSVReader
作为Record Reader
。
在CSVReader
中,您可以将Schema Access Strategy
设置为Use 'Schema Text' Property
。然后将Schema Text
属性设置为以下内容:
{
"type": "record",
"namespace": "nifi",
"name": "db",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "ignoredField", "type": "string" },
{ "name": "value", "type": "string" }
]
}
这将使NiFi Record字段与DB表列相匹配,DB表列将匹配字段1、2和4,而忽略字段3(因为它与列名不匹配(。
显然,修改Schema Text
模式中的字段名称,使其与DB表的列名相匹配。您也可以在这里进行数据类型检查/转换。
PutDatabaseRecord
CSVReader
另一种方法可以是在ConvertRecord的帮助下使用将流文件转换为记录。它有助于转换为CSV格式,无论你喜欢什么,你仍然可以保持CSV格式。
但是,随着流文件成为记录,您现在可以使用其他处理器,如:QueryRecord,这样您就可以在流文件上运行类似SQL的命令:
"从FLOWFILE选择*";
在您的情况下,您可以执行:
"从FLOWFILE中选择col1、col2、col3";
您也可以直接应用过滤:
"从FLOWFILE中选择col1、col2、col3,其中col1>500〃;
我建议您阅读以下内容:
- 查询记录教程
非常感谢pdeuxa和Sdars的回复。你的意见很有帮助。我试着用和你们两个类似的方法。我使用了convertRecord
并配置了CSVRecordReader
和CSVSetRecordWriter
。CSVRecordReader
具有以下模式来读取数据
{
"type": "record",
"namespace": "nifi",
"name": "db",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "Salary", "type": "string" },
{ "name": "dept", "type": "string" }
]
}
而CCD_ 17具有以下输出模式。输入模式中有4个字段,而输出模式只有3列。
{
"type": "record",
"namespace": "nifi",
"name": "db",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "dept", "type": "string" }
]
}
我成功地运行了这个。谢谢你们的意见。