Zipkin发送器类型和Kafka主题在更新Spring Boot后不工作



嗨,我刚刚更新了spring boot到版本3,在我的项目中,我们配置zipkin配置发送span到kafka与特定主题,它现在不工作

zipkin:
sender.type: kafka
kafka.topic: topic-example

在application.yaml中是否有Micrometer跟踪以相同的方式配置zipkin ?或任何其他配置?

= = = =新的更新 ========== 我尝试了另一种方法:

pom.xml

<!--Observability dependencies-->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-kafka</artifactId>
</dependency>
<!--Observability dependencies-->

KafkaConfiguration.java

@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {
static String join(List<?> parts) {
StringBuilder to = new StringBuilder();
for (int i = 0, length = parts.size(); i < length; i++) {
to.append(parts.get(i));
if (i + 1 < length) {
to.append(',');
}
}
return to.toString();
}
@Bean("zipkinSender")
Sender kafkaSender(KafkaProperties config, Environment environment) {
// Need to get property value from Environment
// because when using @VaultPropertySource in reactive web app
// this bean is initiated before @Value is resolved
// See gh-1990
String topic = environment.getProperty("spring.zipkin.kafka.topic", "zipkin");
Map<String, Object> properties = config.buildProducerProperties();
properties.put("key.serializer", ByteArraySerializer.class.getName());
properties.put("value.serializer", ByteArraySerializer.class.getName());
// Kafka expects the input to be a String, but KafkaProperties returns a list
Object bootstrapServers = properties.get("bootstrap.servers");
if (bootstrapServers instanceof List) {
properties.put("bootstrap.servers", join((List) bootstrapServers));
}
return KafkaSender.newBuilder().topic(topic).overrides(properties).build();
}
}

spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group-id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

zipkin:
kafka.topic: user

和我试图通过访问正在运行的docker容器检查日志:

docker exec -it kafka-container /bin/sh
bin/kafka-console-consumer.sh --topic topic-name --bootstrap-server localhost:9092 --property print.headers=true

还是不行,如果我弄错了请告诉我

我们目前不支持http以外的任何其他发送机制。你可以自己创建一个使用Kafka的Sender bean。如果你有兴趣添加不同的发送器机制,请在Spring Boot中提交一个问题

虽然没有官方支持,但我找到了一种使其再次工作的方法(Spring Boot 3.0.1):

  1. 添加io.micrometer:micrometer-tracing-bridge-otel,io.opentelemetry:opentelemetry-exporter-zipkin,io.zipkin.reporter2:zipkin-sender-kafkaorg.springframework.kafka:spring-kafka依赖项

  2. 添加如下配置类,代码复制自Sleuth的ZipkinKafkaSenderConfiguration:

    @Configuration
    @EnableConfigurationProperties(KafkaProperties.class)
    public class KafkaConfig {
    static String join(List<?> parts) {
    StringBuilder to = new StringBuilder();
    for (int i = 0, length = parts.size(); i < length; i++) {
    to.append(parts.get(i));
    if (i + 1 < length) {
    to.append(',');
    }
    }
    return to.toString();
    }
    @Bean("zipkinSender")
    Sender kafkaSender(KafkaProperties config, Environment environment) {
    // Need to get property value from Environment
    // because when using @VaultPropertySource in reactive web app
    // this bean is initiated before @Value is resolved
    // See gh-1990
    String topic = environment.getProperty("spring.zipkin.kafka.topic", "zipkin");
    Map<String, Object> properties = config.buildProducerProperties();
    properties.put("key.serializer", ByteArraySerializer.class.getName());
    properties.put("value.serializer", ByteArraySerializer.class.getName());
    // Kafka expects the input to be a String, but KafkaProperties returns a list
    Object bootstrapServers = properties.get("bootstrap.servers");
    if (bootstrapServers instanceof List) {
    properties.put("bootstrap.servers", join((List) bootstrapServers));
    }
    return KafkaSender.newBuilder().topic(topic).overrides(properties).build();
    }
    }
    
  3. application.yaml文件中配置Kafka:

    spring:
    kafka:
    bootstrap-servers: one-host:9092,another-host:9092
    properties:
    # Set a value for batch.size or an infinite loop will happen when trying to send data to Kafka
    batch.size: 16384
    # Configure your security, sasl or whatever else you need
    # Notice that sampling properties and others moved from 'spring.sleuth' to 'management.tracing' (double-check the property names used)
    management:
    tracing:
    sampling:
    probability: 1.0
    baggage:
    remote-fields: Some-Header
    correlation-fields: Some-Header
    

这应该使它像以前的Spring Boot 2一样工作。x和春云侦探

我设法使它与Spring Boot 3一起工作

根据Spring Cloud Sleuth 3.1迁移指南,Sleuth的API代码已经迁移到Micrometer Tracing。

Brave和OpenTelemetry桥在Micrometer Tracing中有各自的模块。

选择您的Tracer工具并添加依赖项

  1. 为OpenTelemetry
org.springframework.kafka:spring-kafka
io.micrometer:micrometer-tracing:VERSION
io.zipkin.reporter2:zipkin-sender-kafka:VERSION
io.micrometer:micrometer-tracing-bridge-otel:VERSION 
io.opentelemetry:opentelemetry-api:VERSION
org.apache.httpcomponents.client5:httpclient5:VERSION

不确定org.apache.httpcomponents.client5:httpclient5:VERSION是必需的,但我不得不把它,使其工作

  1. 为勇敢的
org.springframework.kafka:spring-kafka
io.micrometer:micrometer-tracing:VERSION
io.zipkin.reporter2:zipkin-sender-kafka:VERSION   
io.zipkin.brave:brave:VERSION
io.micrometer:micrometer-tracing-bridge-brave:VERSION
  1. 配置一个使用Zipkin发送器发送事件给kafka的bean
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(ByteArraySerializer.class)
@ConditionalOnProperty(value = "management.tracing.enabled", havingValue = "true")
public class TracingKafkaSenderConfiguration {
private static final String SENDER_BEAN_NAME = "zipkinSender";
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KafkaProperties.class)
static class TracingKafkaSenderBeanConfiguration {
static String join(List<?> parts) {
StringBuilder to = new StringBuilder();
for (int i = 0, length = parts.size(); i < length; i++) {
to.append(parts.get(i));
if (i + 1 < length) {
to.append(',');
}
}
return to.toString();
}
@Bean(SENDER_BEAN_NAME)
Sender kafkaSender(KafkaProperties config, Environment environment) {
String topic = environment.getProperty("management.tracing.kafka.topic", "topic");
String serviceName = environment.getProperty("management.tracing.service.name", "kafka-sender");
Map<String, Object> properties = config.buildProducerProperties();
properties.put("key.serializer", ByteArraySerializer.class.getName());
properties.put("value.serializer", ByteArraySerializer.class.getName());
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, serviceName);
Object bootstrapServers = properties.get("bootstrap.servers");
if (bootstrapServers instanceof List) {
properties.put("bootstrap.servers", join((List) bootstrapServers));
}
return KafkaSender.newBuilder().topic(topic).overrides(properties).build();
}
}
}
  1. 'spring.sleuth'更改属性"management.tracing">
management:
tracing:
enabled: true
kafka:
topic: topic
service:
name: kafka-sender
sampling:
probability: 0.1
baggage:
remote-fields:
- field-one

注意management.kafkamanagement.service.name是使用kafka和spring应用服务的特定配置。

最新更新