反应性kafka:一旦通过交易处理一次



最初通过API调用

触发
1. Service A produces m1 to topic1 (non transactional send)
2. Service B consumes topic1 and does some processing
(begin tx)
3. Service B produces m2 to topic2
(commit tx)
4. Service A consumes topic2
(begin tx)

这是我的生产者配置:

final Map<String, Object> props = Maps.newConcurrentMap();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all"); 
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");

这是我的消费者配置:

final Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

我阅读了此示例方案

我尝试关注,但我会遇到一些问题:

这是我的制作人代码:

    @Override
public Mono<SenderResult<Void>> buy(Message msg) {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    return kafkaProducerTemplate.send(mytopic, msg);
}

我的消费者代码:

@Override
public void run(ApplicationArguments arg0) throws Exception {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = kafkaConfig.getKafkaConsumerTemplate(mytopic, Message.class);
    final Flux<ConsumerRecord<String, Message>> flux = kafkaConsumerTemplate.receiveExactlyOnce(kafkaProducerTemplate.transactionManager())
            .concatMap(receiverRecordFlux -> receiverRecordFlux );
    flux.subscribe(record -> {
        final Message message = record.value();
    System.out.printf("received message: timestamp=%s key=%d value=%sn",
                dateFormat.format(new Date(record.timestamp())),
                record.key(),
                message);
     transactionService.processAndSendToNextTopic(message)
                .doOnSuccess(aVoid -> kafkaProducerTemplate.transactionManager().commit())
                .subscribe();
    });
}

测试在Happy Case下生产并将消息传递给单个分区主题时,我总是会遇到错误。

Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

有人可以向我展示如何正确开始交易,何时进行交易?

请注意,如果我不使用tracsactional -recection((而不是接收exactlyonce

,这一切都很好

您可能必须将kafkatransactionmanager同步配置为 SYNCHRONIZATION_ALWAYS

请检查。

另请参见

Spring Kafka中的交易同步

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/kafka/transaction/kafkatransactionmanager.html

相关内容

  • 没有找到相关文章

最新更新