Kafka Connect没有模式,只有JSON



我想使用带有JSON和不带模式的JDBC接收器连接器。他们写(来源(:

如果您需要在没有连接数据的模式注册表的情况下使用JSON可以使用Kafka支持的JsonConverter。下面的示例显示了JsonConverter键和添加到配置:

key.coverter=org.apache.kafka.connect.json.json转换器value.coverter=org.apache.kafka.connect.json.JsonConverterkey.coverter.schemas.enable=falsevalue.coverter.schemas.enable=错误

当properties键.coverter.schemas.enable和value.coverter.schemas.enable设置为true,则键或值为不被视为纯JSON,而是被视为复合JSON对象包含内部模式和数据。当这些为源连接器启用,架构和数据都在复合JSON对象。当这些对于汇点连接器被启用时,从复合JSON对象中提取模式和数据。请注意,此实现从不使用Schema Registry。

当properties键.coverter.schemas.enable和value.coverter.schemas.enable仅设置为false(默认值(数据是在没有模式的情况下传递的。这减少了有效载荷不需要模式的应用程序的开销。

我配置的连接器:

{
"name": "noschemajustjson",
"config": {
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"schemas.enable": "false",
"name": "noschemajustjson",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"config.action.reload": "restart",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": "testconnect2",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "********",
"dialect.name": "PostgreSqlDatabaseDialect",
"table.name.format": "utp",
"auto.create": "false",
"auto.evolve": "false"
}
}

但我仍然得到错误:

由:org.apache.kafka.connect.errors.ConnectException引起:接收器连接器"noschemajustjson2"配置为"delete.enabled=false"one_answers"pk.mode=none",因此需要具有非null结构值和非null结构架构的记录,但是在找到记录(主题="测试连接2",分区=0,偏移量=0,时间戳=1626416739697(具有HashMap值和null值架构。

那么,我应该怎么做才能强制Connect在没有架构的情况下工作(只有纯JSON(?

我想使用带有JSON的JDBC接收器连接器,而不使用模式

您不能这样做-JDBC接收器连接器流到关系数据库,而关系数据库具有模式:-D因此,JDBC接收器连接器需要为数据提供模式。

根据您的数据来源,您有不同的选择。

  • 如果它是从Kafka Connect获取的,请使用支持模式的转换器(Avro、Protobuf、JSON模式(
  • 如果它是由您可以控制的应用程序生成的,请让该应用程序使用模式(Avro、Protobuf、JSONSchema(串行化该数据
  • 如果它来自您无法控制的地方,那么您需要预处理该主题以添加一个显式模式,并将其写入一个新主题,然后由JDBC接收器连接器使用

引用&资源:

  • Kafka Connect JDBC接收器深度挖掘:使用主键
  • 使用ksqlDB将模式应用于JSON数据

最新更新