在Nifi中从Avro Schema创建一个Postgresql表



使用InferAvroSchema,我得到了我的文件的Avro模式。我想使用这个 Avro 模式在 PostregSql 中创建一个表。我必须使用哪个处理器。

我使用:GetFile->InferAvroSchema->我想从这个模式创建一个表-> Put databaseRecord。

阿夫罗模式:

{
  "type" : "record",
  "name" : "warranty",
  "doc" : "Schema generated by Kite",
  "fields" : [ {
    "name" : "id",
    "type" : "long",
    "doc" : "Type inferred from '1'"
  }, {
    "name" : "train_id",
    "type" : "long",
    "doc" : "Type inferred from '21691'"
  }, {
    "name" : "siemens_nr",
    "type" : "string",
    "doc" : "Type inferred from 'Loco-001'"
  }, {
    "name" : "uic_nr",
    "type" : "long",
    "doc" : "Type inferred from '193901'"
  }, {
    "name" : "Configuration",
    "type" : "string",
    "doc" : "Type inferred from 'ZP28'"
  }, {
    "name" : "Warranty_Status",
    "type" : "string",
    "doc" : "Type inferred from 'Out_of_Warranty'"
  }, {
    "name" : "Warranty_Data_Type",
    "type" : "string",
    "doc" : "Type inferred from 'Real_based_on_preliminary_acceptance_date'"
  }, {
    "name" : "of_progression",
    "type" : "long",
    "doc" : "Type inferred from '100'"
  }, {
    "name" : "Delivery_Date",
    "type" : "string",
    "doc" : "Type inferred from '18/12/2009'"
  }, {
    "name" : "Warranty_on_Delivery_Date",
    "type" : "string",
    "doc" : "Type inferred from '18/12/2013'"
  }, {
    "name" : "Customer_Status",
    "type" : "string",
    "doc" : "Type inferred from 'homologation'"
  }, {
    "name" : "Commissioning_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/10/2010'"
  }, {
    "name" : "Preliminary_acceptance_date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2011'"
  }, {
    "name" : "Warranty_Start_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2011'"
  }, {
    "name" : "Warranty_End_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2013'"
  }, {
    "name" : "Effective_End_Warranty_Date",
    "type" : [ "null", "string" ],
    "doc" : "Type inferred from 'null'",
    "default" : null
  }, {
    "name" : "Level_2_in_function",
    "type" : "string",
    "doc" : "Type inferred from '17/07/2015'"
  }, {
    "name" : "Baseline",
    "type" : "string",
    "doc" : "Type inferred from '2.10.23.4'"
  }, {
    "name" : "RELN_revision",
    "type" : "string",
    "doc" : "Type inferred from '0434-26.3'"
  }, {
    "name" : "TC_report",
    "type" : "string",
    "doc" : "Type inferred from 'A480140'"
  }, {
    "name" : "Last_version_Date",
    "type" : "string",
    "doc" : "Type inferred from 'A-23/09/2015'"
  }, {
    "name" : "ETCS_ID_NID_Engine",
    "type" : [ "null", "long" ],
    "doc" : "Type inferred from '13001'",
    "default" : null
  }, {
    "name" : "Item_Type",
    "type" : "string",
    "doc" : "Type inferred from 'Item'"
  }, {
    "name" : "Path",
    "type" : "string",
    "doc" : "Type inferred from 'sites/TrWMTISnerc_Community/Lists/X4Trains'"
  } ]
}

我的表创建表是:

Create table warranty(
  id    float,
  train_id float,
  siemens_nr    varchar(255),
  uic_nr    float,
  configuration varchar(255),
  warranty_status   varchar(255),
  warranty_data_type    varchar(255),
  of_progression    float,
  delivery_date varchar(255),
  warranty_on_delivery_date varchar(255),
  customer_status   varchar(255),
  commissioning_date    varchar(255),
  preliminary_acceptance_date   varchar(255),
  warranty_start_date   varchar(255),
  warranty_end_date varchar(255),
  effective_end_warranty_date   varchar(255),
  level_2_in_function   varchar(255),
  baseline  varchar(255),
  reln_revision varchar(255),
  tc_report varchar(255),
  last_version_Date varchar(255),
  etcs_id_nid_engine    float,
  item_type  varchar(255),
  path varchar(255)
)

我可以建议在 nifi v1.5+ 中ExecuteGroovyScript处理器

定义新的属性SQL.mydb - 系统将提示您将其值链接到数据库 ( DBCPConnectionPool (

选择要在其中创建表的数据库

并使用此脚本(假设 avro 架构位于流文件内容中(

import groovy.json.JsonSlurper
def ff = session.get()
if(!ff)return
//parse avro schema from flow file content
def schema = ff.read().withReader("UTF-8"){ new JsonSlurper().parse(it) }
//define type mapping
def typeMap = [
    "string"            : "varchar(255)",
    "long"              : "numeric(10)",
    [ "null", "string" ]: "varchar(255)",
    [ "null", "long" ]  : "numeric(10)",
]
assert schema.name && schema.name=~/^w.*/
//build create table statement
def createTable = "create table ${schema.name} (" +
    schema.fields.collect{ "n  ${it.name.padRight(39)} ${typeMap[it.type]}" }.join(',') +
    "n)"
//execute statement through the custom defined property
//SQL.mydb references http://docs.groovy-lang.org/2.4.10/html/api/groovy/sql/Sql.html object
SQL.mydb.execute(createTable as String) //important to cast to String
//transfer flow file to success
REL_SUCCESS << ff

相关内容

最新更新