我向kafka主题发送数据和值模式,如下所示:
./bin/kafka-avro-console-producer
--broker-list 10.0.0.0:9092 --topic orders
--property parse.key="true"
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}'
--property key.separator="$"
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type": ["null","int"],"default": null}, {"name":"price","type": ["null","int"],"default": null}]}'
--property schema.registry.url=http://10.0.0.0:8081
然后我从kafka获得此接收器属性的数据:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "[redact]",
"connection.password": "[redact]",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id",
"insert.mode": "upsert",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
但我只想从kafka获得json,而不需要value.schema
。如果我把kafka主题放在这个json数据上
{"id":9}${"id": {"int":9}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":61}}
我如何从kafka获得这些数据,并将oracle与合流的jdbc汇放在一起。
我想在Kafka Connect端制作模式?
另一件事是,我可以从一个kafka主题中获得两种不同类型的数据吗?它在oracle端使用jdbc接收器访问两个不同的表。
如果您有一个源主题包含没有声明模式的JSON数据,则必须添加该模式,然后才能使用JDBC接收器。
选项包括:
- ksqlDB,如下所示:https://www.youtube.com/watch?v=b-3qN_ tlYR4&t=981s
- Kafka Connect的单消息转换功能。Apache Kafka提供的SMT都无法做到这一点,但有一些原型可以做到
- 其他流处理,例如Kafka Streams
编辑:
我的意思是,我可以从一个kafka主题定义两个不同的jdbc接收器到不同的oracle表吗
是的,每个主题都可以由多个接收器使用。table.name.format
配置选项可用于根据需要将主题路由到不同的表名。