如何手动控制使用骆驼卡夫卡的偏移提交



我正在使用骆驼kafka组件,而我尚不清楚引擎盖下发生了什么时发生了什么。如下所示,我正在汇总记录,我认为对于我的用例,只有在将记录保存到SFTP之后才能提交偏移是有意义的。

可以手动控制我何时可以执行该提交?

private static class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("kafka:{{mh.topic}}?" + getKafkaConfigString())
        .unmarshal().string()
        .aggregate(constant(true), new MyAggregationStrategy())
            .completionSize(1000)
            .completionTimeout(1000)
        .setHeader("CamelFileName").constant("transactions-" + (new Date()).getTime())
        .to("sftp://" + getSftpConfigString())
        // how to commit offset only after saving messages to SFTP?
        ;
    }
    private final class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }
            String oldBody = oldExchange.getIn().getBody(String.class); 
            String newBody = newExchange.getIn().getBody(String.class);
            String body = oldBody + newBody;
            oldExchange.getIn().setBody(body);
            return oldExchange;
        }
    }
}
private static String getSftpConfigString() {
        return "{{sftp.hostname}}/{{sftp.dir}}?"
                + "username={{sftp.username}}"
                + "&password={{sftp.password}}"
                + "&tempPrefix=.temp."
                + "&fileExist=Append"
                ;
}
private static String getKafkaConfigString() {
        return "brokers={{mh.brokers}}" 
            + "&saslMechanism={{mh.saslMechanism}}"  
            + "&securityProtocol={{mh.securityProtocol}}"
            + "&sslProtocol={{mh.sslProtocol}}"
            + "&sslEnabledProtocols={{mh.sslEnabledProtocols}}" 
            + "&sslEndpointAlgorithm={{mh.sslEndpointAlgorithm}}"
            + "&saslJaasConfig={{mh.saslJaasConfig}}" 
            + "&groupId={{mh.groupId}}"
            ;
}

不,你不能。Kafka每x秒执行一次自动提交(您可以配置此)。

Camel-Kafka没有手动提交支持。此外,这是不可能的,因为聚合器与Kafka消费者分开,并且是执行提交的消费者。

我认为这是最新版本的骆驼(2.22.0)(doc)的变化。

// Endpoint configuration &autoCommitEnable=false&allowManualCommit=true
public void process(Exchange exchange) {
     KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
     manual.commitSync();
}

您可以通过使用偏移存储库(骆驼文档)

来控制手动偏移订单(使用多线程途径(使用聚合器进行典范))<
@Override
public void configure() throws Exception {
      // The route
      from(kafkaEndpoint())
            .routeId(ROUTE_ID)
            // Some processors...
            // Commit kafka offset
            .process(MyRoute::commitKafka)
            // Continue or not...
            .to(someEndpoint());
}
private String kafkaEndpoint() {
    return new StringBuilder("kafka:")
            .append(kafkaConfiguration.getTopicName())
            .append("?brokers=")
            .append(kafkaConfiguration.getBootstrapServers())
            .append("&groupId=")
            .append(kafkaConfiguration.getGroupId())
            .append("&clientId=")
            .append(kafkaConfiguration.getClientId())
            .append("&autoCommitEnable=")
            .append(false)
            .append("&allowManualCommit=")
            .append(true)
            .append("&autoOffsetReset=")
            .append("earliest")
            .append("&offsetRepository=")
            .append("#fileStore")
            .toString();
}
@Bean(name = "fileStore", initMethod = "start", destroyMethod = "stop")
private FileStateRepository fileStore() {
    FileStateRepository fileStateRepository = 
    FileStateRepository.fileStateRepository(new File(kafkaConfiguration.getOffsetFilePath()));
    fileStateRepository.setMaxFileStoreSize(10485760); // 10MB max
    return fileStateRepository;
}
private static void commitKafka(Exchange exchange) {
    KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
    manual.commitSync();
}

最新更新