Nifi 验证记录和转换记录以使用 avroschemaregistry 验证和转换 json 记录



我需要有关NiFi中ValidateRecordConvertRecord处理器的理解和功能的帮助。

我的要求

我有一个具有各种属性的 JSON 流文件内容,很少有字段是必填字段,很少是可选的,很少有字段是时间戳字段,很少是双精度类型。有些字段应该在 LOV 中具有值

我使用了AvroSchemaRegistry及以下的架构定义:

{
"namespace": "nifi",
"name": "test_json",
"type": "record",
"fields": [{
"name": "sn",
"type": "string"
}, {
"name": "result",
"type": {
"name": "Result",
"type": "enum",
"symbols": ["PASS", "FAIL"]
}
}, {
"name": "product",
"type": "string",
"maxLength": 8
}, {
"name": "test_station_name",
"type": "string",
"maxLength": 32
}, {
"name": "station_id",
"type": "string"
}, {
"name": "mac_address",
"type": "string"
}, {
"name": "start_time",
"type": {
"type": "string",
"logicalType": "timestamp-millis"
}
}, {
"name": "stop_time",
"type": {
"type": "string",
"logicalType": "timestamp-millis"
}
}, {
"name": "f_p_dip_wave",
"type": ["null", "double"]
}, {
"name": "f_p_dip_depth",
"type": ["null", "double"]
}, {
"name": "f_p_dip_height",
"type": ["null", "double"]
}, {
"name": "radius",
"type": ["null", "double"]
}, {
"name": "diameter",
"type": ["null", "double"]
}, {
"name": "gain_cavity_offset_nm",
"type": ["null", "double"]
}]
}

字段结果应具有 PASS 或 FAIL 值,并且 product 和 test_station_name 可以对值有最大长度限制,并且没有默认值"null"的字段是必需的。

源端,客户端应用程序可以在 json 中发送任何一组属性,我正在尝试使用 jsonreader 和 jsonwriter 使用 validaterecord 来验证记录,然后使用 convertrecord 根据架构适当地转换 json。

我可以看到NiFi能够检测字段名称并能够对数据类型(字符串,双精度和时间戳(执行验证,但不会失效 1.基于结果属性的枚举值(通过或失败除外(的记录 2.也不会使产品和test_station_name字段的记录长度大于架构中定义的最大长度的记录无效。3. 此外,即使 JSON 没有必填字段的属性和值,验证记录也会将这些字段视为"null"。 相反,所有 JSON 记录都已成功验证。

问题

NiFiValidateRecordConvertRecord可用于验证传入的 JSON 并将其转换为传出的 json,并具有上述一些验证规则。如果没有,是否有替代方法可以使用 groovy 脚本根据架构对传入的 json 执行此类验证和转换。

请指教。任何帮助将不胜感激。

1(目前来自Avro模式的枚举在NiFi的内部记录模式中转换为字符串类型,因此这就是传递任何值的原因。NiFi的记录架构中需要一个枚举类型,该枚举类型从Avro模式中捕获允许的值。

2(我在Avro规范中找不到有关maxLength的任何信息 - https://avro.apache.org/docs/current/spec.html 这是真的吗?如果是,那么NiFi可以考虑将其合并。

3(如果一个字段没有值,那么它应该是无效的,除非字段的类型是带有"null"的并集,例如"type":["null", "double"],这意味着该字段不是必需的,允许为null或double。

最新更新