我有一个kafka主题,它使用Debezium mysql源连接器从mysql数据库获取数据,以下是其中一条消息的格式:
{
"Message": {
"schema": {
"type": "struct",
"fields": [
...
],
"optional": true,
"name": "mysql-server-1.inventory.somename"
},
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Doof",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Marry",
"email": "annek@noanswer.org"
},
"source": {
"db": "inventory",
"table": "customers",
...
"query": "Update customers set last_name = 'Marry' where id = 1004"
}
}
}
}
我想使用jdbc接收器连接器将ts_ms, before, after
和id
(从对象/行(列推送到另一个数据库中,表模式为(id,before(text),after(text),timestamp)
,这对kafka来说是个新手,无法理解:
如何仅从消息中提取这些字段,以推送并忽略其他字段?
如何将before、after字段转换为字符串/序列化格式?
如何从对象中提取
id
?(如果是插入操作,则前为空,如果是删除操作,则后为空(
对于上面的消息,接收器目的地表的末尾应该有如下数据:
id: 1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
after: '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
timestamp: 1465491411815
您可以使用Kafka Connect转换链,就像这个解决方案一样。
您可以创建一个DTO(从kafka主题中获得的json负载的Java对象(。利用这个在线转换器可以帮助您将json转换为Java对象。[http://pojo.sodhanalibrary.com/][1]
一旦收到来自kafka主题的消息,就可以使用objectmapper转换json并将其映射到适当的DTO对象中。一旦您准备好了对象。您可以通过调用getId((、getBefore((等来利用该对象提取您想要的字段
以下是一些有助于您理解的参考代码:
@KafkaListener(topics = "test")
public void listen(String payload) {
logger.info("Message Received from Kafka topic: {}", payload);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);
logger.info("After Convertion: {}", objectMapper.writeValueAsString(dtoObject));
logger.info("Get Before:{}", dtoObject.getId());
}