带有JSON Schema的Kafka JDBC水槽连接器不起作用



使用最新的kafka和Confluent JDBC接收器连接器。发送一个非常简单的JSON消息:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "msg"
            }
        ],
        "optional": false,
        "name": "msgschema"
    },
    "payload": {
        "id": 222,
        "msg": "hi"
    }
}

但是出现错误:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

JSONLINT说JSON有效。我已经将JSON schemas.enable=true保存在KAFKA配置中。任何指针?

您需要告诉Connect您所使用的JSON中嵌入您的架构

您有:

value.converter=org.apache.kafka.connect.json.JsonConverter 

,但也需要:

value.converter.schemas.enable=true

为了使用JDBC接收器,您的流消息必须具有模式。这可以通过将AVRO与模式注册表一起使用,也可以通过将JSON与架构一起使用来实现。您可能需要删除主题,重新运行接收器,然后再启动源侧,如果最初运行源属性文件后配置了schemas.enable=true

示例:

sink.properties文件

name=sink-mysql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=test-mysql-jdbc-foobar
connection.url=jdbc:mysql://127.0.0.1:3306/demo?user=user1&password=user1pass
auto.create=true

和一个示例Worker配置文件connect-avro-standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java

并执行

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink.properties

我最近遇到了同一问题,在我弄清楚缺少的内容之前,它进行了多次重试:

"设置"为我工作:

key.converter.schemas.enable=false
value.converter.schemas.enable=true

另外,请确保该表在数据库中之前存在,并且连接器不应尝试尝试创建一个表。放auto.create = false

相关内容

  • 没有找到相关文章

最新更新