>我有一个 Kafka 主题,其中包含具有 Avro 序列化密钥和 Avro 序列化值的消息。
我正在尝试设置一个接收器连接器,以将这些值放入 postgres 数据库(在本例中为 AWS RDS)中的表中。
我已经尝试了主题、消息和接收器配置本身的许多变体,但看看下面的示例,如果有人能就我出错的地方提供指导,那就太好了!:)
我的主题具有以下架构(在架构注册表中)...
密钥架构
{
"type": "record",
"name": "TestTopicKey",
"namespace": "test.messaging.avro",
"doc": "Test key schema.",
"fields": [
{
"name": "unitId",
"type": "int"
}
]
}
值架构
{
"type": "record",
"name": "TestTopicValues",
"namespace": "test.messaging.avro",
"doc": "Test value schema.",
"fields": [
{
"name": "unitPrice",
"type": "int",
"doc": "Price in AUD excluding GST."
},
{
"name": "unitDescription",
"type": "string"
}
]
}
我正在使用"kafka-avro-console-producer"手动生成该主题的记录,如下所示:
/bin/kafka-avro-console-producer --broker-list kafka-box-one:9092 --topic test.units --property parse.key=true --property "key.separator=|" --property "schema.registry.url=http://kafka-box-one:8081" --property key.schema='{"type":"record","name":"TestTopicKey","namespace":"test.messaging.avro","doc":"Test key schema.","fields":[{"name":"unitId","type":"int"}]}' --property value.schema='{"type":"record","name":"TestTopicValues","namespace":"test.messaging.avro","doc":"Test value schema.","fields":[{"name":"unitPrice","type":"int","doc":"Price in AUD excluding GST."},{"name":"unitDescription","type":"string"}]}'
一旦该生产者启动,我就可以成功地将记录添加到主题中,如下所示:
{"unitId":111}|{"unitPrice":15600,"unitDescription":"A large widget thingy."}
注意:我也可以像预期的那样成功地使用kafka-avro-console-consumer。
我试图沉入的 postgres 表如下所示:
CREATE TABLE test_area.unit_prices (
unitId int4 NOT NULL,
unitPrice int4 NULL,
unitDescription text NULL,
CONSTRAINT unit_prices_unitid_pk PRIMARY KEY (unitId)
);
我的接收器连接器如下所示:
{
"name": "test.area.unit.prices.v01",
"config": {
"connector.class": "JdbcSinkConnector",
"topics": "test.units",
"group.id": "test.area.unit.prices.v01",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://kafka-box-one:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://kafka-box-one:8081",
"connection.user": "KafkaSinkUser",
"connection.password": "KafkaSinkPassword",
"connection.url": "jdbc:postgresql://unit-catalogue.abcdefghij.my-region-1.rds.amazonaws.com:5432/unit_sales?currentSchema=test_area",
"table.name.format": "unit_prices",
"auto.create": false,
"auto.evole": "false"
}
}
我的期望是,在接收器显示为正在运行后不久,记录就会出现在 postgres 表中。 然而,什么都没有下沉。
附加说明:
- 我可以从 Kafka Connect 框中连接并写入 postgres RDS 实例,该框正在使用使用 usql 的接收器连接器使用凭据发布此接收器连接器。 接收器
- 连接器状态为"正在运行",这向我表明接收器语法中没有错误。
为这个人非常迟来的回复道歉。 在最终让日志正常工作后,这是一个代理问题。 谢谢大家的帮助。