嗨,我刚刚更新了spring boot到版本3,在我的项目中,我们配置zipkin配置发送span到kafka与特定主题,它现在不工作
zipkin:
sender.type: kafka
kafka.topic: topic-example
在application.yaml中是否有Micrometer跟踪以相同的方式配置zipkin ?或任何其他配置?
= = = =新的更新 ========== 我尝试了另一种方法:
pom.xml
<!--Observability dependencies-->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-kafka</artifactId>
</dependency>
<!--Observability dependencies-->
KafkaConfiguration.java
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {
static String join(List<?> parts) {
StringBuilder to = new StringBuilder();
for (int i = 0, length = parts.size(); i < length; i++) {
to.append(parts.get(i));
if (i + 1 < length) {
to.append(',');
}
}
return to.toString();
}
@Bean("zipkinSender")
Sender kafkaSender(KafkaProperties config, Environment environment) {
// Need to get property value from Environment
// because when using @VaultPropertySource in reactive web app
// this bean is initiated before @Value is resolved
// See gh-1990
String topic = environment.getProperty("spring.zipkin.kafka.topic", "zipkin");
Map<String, Object> properties = config.buildProducerProperties();
properties.put("key.serializer", ByteArraySerializer.class.getName());
properties.put("value.serializer", ByteArraySerializer.class.getName());
// Kafka expects the input to be a String, but KafkaProperties returns a list
Object bootstrapServers = properties.get("bootstrap.servers");
if (bootstrapServers instanceof List) {
properties.put("bootstrap.servers", join((List) bootstrapServers));
}
return KafkaSender.newBuilder().topic(topic).overrides(properties).build();
}
}
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group-id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
zipkin:
kafka.topic: user
和我试图通过访问正在运行的docker容器检查日志:
docker exec -it kafka-container /bin/sh
bin/kafka-console-consumer.sh --topic topic-name --bootstrap-server localhost:9092 --property print.headers=true
还是不行,如果我弄错了请告诉我
我们目前不支持http以外的任何其他发送机制。你可以自己创建一个使用Kafka的Sender bean。如果你有兴趣添加不同的发送器机制,请在Spring Boot中提交一个问题
虽然没有官方支持,但我找到了一种使其再次工作的方法(Spring Boot 3.0.1):
-
添加
io.micrometer:micrometer-tracing-bridge-otel
,io.opentelemetry:opentelemetry-exporter-zipkin
,io.zipkin.reporter2:zipkin-sender-kafka
和org.springframework.kafka:spring-kafka
依赖项 -
添加如下配置类,代码复制自Sleuth的ZipkinKafkaSenderConfiguration:
@Configuration @EnableConfigurationProperties(KafkaProperties.class) public class KafkaConfig { static String join(List<?> parts) { StringBuilder to = new StringBuilder(); for (int i = 0, length = parts.size(); i < length; i++) { to.append(parts.get(i)); if (i + 1 < length) { to.append(','); } } return to.toString(); } @Bean("zipkinSender") Sender kafkaSender(KafkaProperties config, Environment environment) { // Need to get property value from Environment // because when using @VaultPropertySource in reactive web app // this bean is initiated before @Value is resolved // See gh-1990 String topic = environment.getProperty("spring.zipkin.kafka.topic", "zipkin"); Map<String, Object> properties = config.buildProducerProperties(); properties.put("key.serializer", ByteArraySerializer.class.getName()); properties.put("value.serializer", ByteArraySerializer.class.getName()); // Kafka expects the input to be a String, but KafkaProperties returns a list Object bootstrapServers = properties.get("bootstrap.servers"); if (bootstrapServers instanceof List) { properties.put("bootstrap.servers", join((List) bootstrapServers)); } return KafkaSender.newBuilder().topic(topic).overrides(properties).build(); } }
-
在
application.yaml
文件中配置Kafka:spring: kafka: bootstrap-servers: one-host:9092,another-host:9092 properties: # Set a value for batch.size or an infinite loop will happen when trying to send data to Kafka batch.size: 16384 # Configure your security, sasl or whatever else you need # Notice that sampling properties and others moved from 'spring.sleuth' to 'management.tracing' (double-check the property names used) management: tracing: sampling: probability: 1.0 baggage: remote-fields: Some-Header correlation-fields: Some-Header
这应该使它像以前的Spring Boot 2一样工作。x和春云侦探
我设法使它与Spring Boot 3一起工作
根据Spring Cloud Sleuth 3.1迁移指南,Sleuth的API代码已经迁移到Micrometer Tracing。
Brave和OpenTelemetry桥在Micrometer Tracing中有各自的模块。
选择您的Tracer工具并添加依赖项
- 为OpenTelemetry
org.springframework.kafka:spring-kafka
io.micrometer:micrometer-tracing:VERSION
io.zipkin.reporter2:zipkin-sender-kafka:VERSION
io.micrometer:micrometer-tracing-bridge-otel:VERSION
io.opentelemetry:opentelemetry-api:VERSION
org.apache.httpcomponents.client5:httpclient5:VERSION
不确定org.apache.httpcomponents.client5:httpclient5:VERSION
是必需的,但我不得不把它,使其工作
- 为勇敢的
org.springframework.kafka:spring-kafka
io.micrometer:micrometer-tracing:VERSION
io.zipkin.reporter2:zipkin-sender-kafka:VERSION
io.zipkin.brave:brave:VERSION
io.micrometer:micrometer-tracing-bridge-brave:VERSION
- 配置一个使用Zipkin发送器发送事件给kafka的bean
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(ByteArraySerializer.class)
@ConditionalOnProperty(value = "management.tracing.enabled", havingValue = "true")
public class TracingKafkaSenderConfiguration {
private static final String SENDER_BEAN_NAME = "zipkinSender";
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KafkaProperties.class)
static class TracingKafkaSenderBeanConfiguration {
static String join(List<?> parts) {
StringBuilder to = new StringBuilder();
for (int i = 0, length = parts.size(); i < length; i++) {
to.append(parts.get(i));
if (i + 1 < length) {
to.append(',');
}
}
return to.toString();
}
@Bean(SENDER_BEAN_NAME)
Sender kafkaSender(KafkaProperties config, Environment environment) {
String topic = environment.getProperty("management.tracing.kafka.topic", "topic");
String serviceName = environment.getProperty("management.tracing.service.name", "kafka-sender");
Map<String, Object> properties = config.buildProducerProperties();
properties.put("key.serializer", ByteArraySerializer.class.getName());
properties.put("value.serializer", ByteArraySerializer.class.getName());
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, serviceName);
Object bootstrapServers = properties.get("bootstrap.servers");
if (bootstrapServers instanceof List) {
properties.put("bootstrap.servers", join((List) bootstrapServers));
}
return KafkaSender.newBuilder().topic(topic).overrides(properties).build();
}
}
}
- 从'spring.sleuth'更改属性到"management.tracing">
management:
tracing:
enabled: true
kafka:
topic: topic
service:
name: kafka-sender
sampling:
probability: 0.1
baggage:
remote-fields:
- field-one
注意management.kafka和management.service.name是使用kafka和spring应用服务的特定配置。