我有一个来自kafka主题的大型json,我正在将其转换为Java对象,以便在find DB中提取我需要的值。有些记录中会有一系列缺陷,我需要捕获这些缺陷并将其发送到另一个主题,这样它们就可以在DB中的自己的表中结束。使用接收器连接器将值插入数据库,这就是我们使用多个主题的原因。
我发现了分支和拆分,但这似乎更多的是为了确定一个已消费的记录应该转到哪个主题,而不是将记录的不同部分发送到不同的主题。有没有办法做到这一点,或者我需要在某个地方更改我的架构。
@Autowired
void buildPipeline(StreamsBuilder builder) throws Exception{
KStream<String, String> messageStream = builder.stream(inputTopic, Consumed.with(STRING_SERDE, STRING_SERDE));
logger.info("started consumer");
System.out.println(messageStream.toString());
KStream<String, String> auditRecords = messageStream
.map((key, value) -> {
try {
return new KeyValue<>("", mapStringToAuditOutput(value));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
});
auditRecords.to(outputTopic);
}
public String mapStringToAuditOutput(String input) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
AuditOutput auditResults = null;
try {
auditResults= mapper.readValue(input, AuditOutput.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//check for multiple defects
if(auditResults.getCaseObject().getDefects() != null){
//send defects to separate topic
}
String auditJson = JsonFlattener.flatten(mapper.writeValueAsString(auditResults));
System.out.println(auditJson);
return auditJson;
}
找到了分支和拆分,但看起来这更多的是为了确定一个已消费的记录应该转到的哪个主题
正确。在分支之前,您需要过滤+map/mapValues以将部分/全部事件发送到不同的主题
更具体地说,创建中间KStream实例并多次使用to()
例如,
// Change the value serde to use your JSON class
KStream<String, AuditOutput> auditRecords = messageStream
.mapValues(value -> {
try {
// input Stream can be <String, String> but this step would be automatic if using a JSONSerde
return mapStringToAuditOutput(value));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}).filter((key, value) -> Objects.nonNull(value)); // remove bad JSON events
Map<String, KStream<String, AuditOutput>> branches = auditRecords.split(Named.as("Branch-"))
.branch((key, value) -> value.getCaseObject().getDefects() != null, /* first predicate */
Branched.as("Defects"))
.defaultBranch(Branched.as("NoDefects")) /* default branch */
);
branches.get("Branch-Defects").to(...)
branches.get("Branch-NoDefects").mapValues(... flatten ... ).to(...)