我使用avro生成一个java类(心跳(,我使用Spring云消息处理器使用这个心跳类将消息推送到kafka。
这是我的服务:
@Service
public class HeartbeatServiceImpl implements HeartbeatService {
private Processor processor;
public HeartbeatServiceImpl(Processor processor) {
this.processor = processor;
}
@Override
public boolean sendHeartbeat(Heartbeat heartbeat) {
Message<Heartbeat> message =
MessageBuilder.withPayload(heartbeat).setHeader(KafkaHeaders.MESSAGE_KEY, "MY_KEY").build();
return processor.output().send(message);
}
}
我的测试包上有这个消费者:
@Component
public class HeartbeatKafkaConsumer {
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "HEARTBEAT")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
this.payload = consumerRecord.toString();
this.latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public Object getPayload() {
return payload;
}
}
现在,在我的实际测试课上,我有这样的:
public class HeartbeatServiceImplIntegrationTest {
@Autowired
private HeartbeatServiceImpl heartbeatService;
@Autowired
private HeartbeatKafkaConsumer heartbeatKafkaConsumer;
@Test
public void assertHeartbeatPushedToKafka() throws InterruptedException {
Heartbeat heartbeat =
Heartbeat.newBuilder().setID("my-Test ID").setINPUTSOURCE("my-Test IS")
.setMSGID("my-Test 123").setMSGTIME(12345l).setRECEIVEDTIME(12345l).build();
boolean isMessageSent = heartbeatService.sendHeartbeat(heartbeat);
assertThat(isMessageSent).isTrue();
heartbeatKafkaConsumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(heartbeatKafkaConsumer.getLatch().getCount()).isEqualTo(0L);
assertThat(heartbeatKafkaConsumer.getPayload()).isEqualTo(heartbeat);
}
}
我可以通过运行ksql看到消息确实到达了kafka。因此,正如预期的那样,信息就在那里。我也收到了来自heartbeatKafkaConsumer的消息,但当我断言时,我收到了这个错误:
Expecting:
<"ConsumerRecord(topic = HEARTBEAT, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1622134829899, serialized key size = 12, serialized value size = 75, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@765aa560, value = [B@3582e1cd)">
to be equal to:
<{"ID": "my-Test ID", "TYPE": "null", "MSG_ID": "my-Test 123", "MSG_TIME": 12345, "RECEIVED_TIME": 12345, "INPUT_SOURCE": "my-Test IS", "SBK_FEED_PROVIDER_ID": "null", "SBK_FEED_PROVIDER_NAME": "null"}>
现在,我尝试以许多不同的方式从我的HeartbeatKafkaConsumer中读取,但我无法将值正确解析回Heartbeat。
我如何将kafka作为心跳消耗,以便根据最初发送的消息进行测试?我甚至不能作为字符串检索。
哦,这是我的应用程序。卡夫卡的属性配置:
spring.cloud.stream.default.producer.useNativeEncoding=true
spring.cloud.stream.default.consumer.useNativeEncoding=true
spring.cloud.stream.bindings.input.destination=HEARTBEAT
spring.cloud.stream.bindings.input.content-type=application/*+avro
spring.cloud.stream.bindings.output.destination=HEARTBEAT
spring.cloud.stream.bindings.output.content-type=application/*+avro
spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.binder.producer-properties.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.binder.producer-properties.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.binder.consumer-properties.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.binder.consumer-properties.key.serializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.binder.consumer-properties.value.serializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.binder.consumer-properties.specific.avro.reader=true
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=myclient
spring.kafka.consumer.auto-offset-reset=earliest
使用consumerRecord.value()
而不是toString。
它将是一个byte[]
,您可以将其传递到ObjectMapper中以将其反序列化为Heartbeat
。
或者,只需将消费者配置为使用JsonDeserializer
,consumerRecord.value()
将是心跳。您需要配置反序列化程序来告诉它要创建哪种类型。
第三个(也是最简单的(选项是添加一个JsonMessageConverter
@Bean
(引导会将其连接到侦听器容器中(,并将方法更改为
@KafkaListener(topics = "HEARTBEAT")
public void receive(Heartbeat heartbeat) {
...
}
框架从方法签名告诉转换器要创建什么类型。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#messaging-消息转换