Kafka同步通信/使用ReplyingKafkaTemplate双向通信导致响应/回复主题滞后 &



我们的产品中有几个微服务,有一些业务用例,其中一个微服务(TryServiceOne)必须将请求委托给另一个微服务(TryServiceThree)。为此,终端用户正在等待来自API的响应。所以我们使用ReplyingKafkaTemplate,这样我们就可以立即响应调用者。一切似乎都在工作,但我们在回复主题中看到延迟,这导致我们的警报系统被警报轰炸。但是在后台消息被RequestReplyFuture读取并成功处理,Kafka代理的延迟一直在增加。请建议如何避免lag。

重要我们正在使用具有多个节点的微服务集群部署。因此,我们使用自定义分区将响应/回复主题始终分配到一个分区。

TryServiceOneKafkaConfiguration.class

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public Map<String,Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
return props;
}
@Bean
public ProducerFactory<String, RequestModel> requestProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, RequestModel> kafkaTemplate() {
return new KafkaTemplate<>(requestProducerFactory());
}
@Bean
public ReplyingKafkaTemplate<String, RequestModel, ResponseModel> replyKafkaTemplate(ProducerFactory<String, RequestModel> pf,
                                  KafkaMessageListenerContainer<String, ResponseModel> container){
return new ReplyingKafkaTemplate<>(pf, container);
}
@Bean
public KafkaMessageListenerContainer<String, ResponseModel> replyContainer(ConsumerFactory<String, ResponseModel> cf) {
TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset("RESPONSE_TOPIC",0);
ContainerProperties containerProperties = new ContainerProperties(topicPartitionOffset);

containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

我的SendAndReceive服务组件如下图

RequestModel requestModel= new RequestModel();
distributorRequestEvent.setDistributorModel(producerRecord);
// create producer record
ProducerRecord<String, RequestModel> record = new ProducerRecord<String, RequestModel>("REQUEST_TOPIC", requestModel);
// set reply topic in header
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "RESPONSE_TOPIC".getBytes(StandardCharsets.UTF_8)));

kafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(30));
LOGGER.info("Sending message ... {}",producerRecord);
RequestReplyFuture<String, RequestModel, ResponseModel> sendAndReceive = kafkaTemplate.sendAndReceive(record);
// confirm if producer produced successfully
SendResult<String, RequestModel> sendResult = sendAndReceive.getSendFuture().get();

// get consumer record
ConsumerRecord<String, ResponseModel> consumerRecord = sendAndReceive.get();

return consumerRecord.value();

TryServiceThree Microservice<卡夫卡配置/em>

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(JsonDeserializer.TYPE_MAPPINGS,RequestModel.class);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
return props;
}
@Bean
public ConsumerFactory<String, RequestModel> requestConsumerFactory() {
JsonDeserializer<RequestModel> deserializer = new JsonDeserializer<>(RequestModel.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
deserializer);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, RequestModel>> requestListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RequestModel> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(requestConsumerFactory());
//   factory.setMessageConverter(new JsonMessageConverter());
factory.setReplyTemplate(replyTemplate());
return factory;
}
@Bean
public ProducerFactory<String, ResponseModel> replyProducerFactory() {
ProducerFactory<String, ResponseModel> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
return producerFactory;
}
@Bean
public KafkaTemplate<String, ResponseModel> replyTemplate() {
return new KafkaTemplate<>(replyProducerFactory());
}

CustomPartitioning on TryServiceThree

public class CustomPartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}

使用

containerProperties.setAckMode(ContainerProperties.AckMode.BATCH);

回复容器

相关内容

  • 没有找到相关文章

最新更新