如何捕获不符合卡夫卡流连接条件的 kafka 记录?



我正在通过加入带有ktable的kstream来进行数据扩充。kstream 包含车辆发送的消息,ktable 包含车辆数据。我遇到的问题是我想从表中没有相应联接键的流中捕获消息。Kafka 流静默地跳过他们没有加入匹配的记录。有没有办法将这些记录发送到不同的主题,以便以后可以处理它们?

StreamsBuilder builder = new StreamsBuilder();
        final KTable<String, VinMappingInfo> vinMappingTable = builder.table(vinInfoTopic, Consumed.with(Serdes.String(), valueSerde));
        KStream<String, VehicleMessage> vehicleStream = builder.stream(sourceTopic);
        vehicleStream.join(vinMappingTable, (vehicleMsg, vinInfo) -> {
            log.info("joining {} with vin info {}", vehicleMsg.getPayload().getId(), vinInfo.data.vin);
            vehicleMsg.setVin(vinInfo.data.vin);
            return vehicleMsg;
        }, Joined.with(null, null, valueSerde))
                .to(destinationTopic);
        final Topology topology = builder.build();
        log.info("The topology of connected processor nodes: n {}", topology.describe());
        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.cleanUp();
        streams.start();

您可以使用左连接:

stream.leftJoin(table,...);

这可确保输入流中的所有记录都在输出流中。在这种情况下,ValueJoiner将与apply(streamValue, null)一起调用。

相关内容

  • 没有找到相关文章

最新更新