使用@KafkaListener为Kafka消费者进行勇敢的追踪



我正在使用Brave库https://github.com/openzipkin/brave用于追踪,现在我也想将其用于卡夫卡消费者。我想避免添加Spring Sleuth,而只使用Brave Kafka工具https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients.

对于Kafka消费者,我使用@KafkaListener。代码如下:

TestKafkaEndpoint.java

@Service
public class TestKafkaEndpoint {
@KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
public void procesMyRequest(@Payload final MyRequest request) {
// do some magic...
}
}

和配置类TestKafkaConfig.java


@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {
@Bean
public ConsumerFactory<String, MyRequest> testConsumerFactory() {
final Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(testConsumerFactory());
factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
return factory;
}

但我不知道在使用Kafka工厂时如何使用KafkaConsumer,也不知道如何利用KafkaTracing。有人有这方面的经验吗?

我不熟悉它,但看起来TracingConsumer是一个简单的消费者包装器:https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java#L69-L79

您应该能够创建DefaultKafkaConsumerFactory的子类;重写createConsumer方法-侦听器容器使用。。。

this.consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);

调用super.createConsumer(…(并将其封装在TracingConsumer中。

如果使用的是2.5.3或更高版本,则可以将ConsumerPostProcessor添加到DKCF中。

侦探就是这么做的:

https://github.com/spring-cloud/spring-cloud-sleuth/blob/6e306e594d20361483fd19739e0f5f8e82354bf5/spring-cloud-sleuth-brave/src/main/java/org/springframework/cloud/sleuth/brave/instrument/messaging/TraceMessagingAutoConfiguration.java#L263-L285

对于仍在寻找解决此问题的方法的人,我设法使用ConsumerPostProcessor实现了一个解决方案。

@Configuration
@EnableKafka
public class Config {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaTracing kafkaTracing) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
consumerFactory.addPostProcessor(kafkaTracing::consumer);
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setObservationEnabled(true);
return factory;
}
@Bean
KafkaTracing kafkaStreamsTracing(Tracing tracing) {
return KafkaTracing.create(tracing);
}
}

最新更新