使用junit5进行Kafka流绑定器单元测试



我正在尝试使用函数式方法编写Kafka流绑定单元测试用例。

@FunctionalSpringBootTest(classes = {Application.class},properties = {"spring.cloud.stream.kafka.binder.brokers=localhost:9092"})
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=PLAINTEXT://localhost:9092","port=9092","socket.request.max.bytes=409296129"},
topics = {"inputTopic"}, controlledShutdown = true)
@DirtiesContext
public class Test {
@Test
public void testKstreamBinderWithPojoInputAndStringOuput() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id",
"false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "inputTopic");
SpringApplication app = new SpringApplication(Application.class); //Springboot application class
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.bindings.process-in-0=input",
"--spring.cloud.stream.function.bindings.process-out-0=output",
"--spring.cloud.stream.bindings.input.destination=inputTopic",
"--spring.cloud.stream.bindings.output.destination=outputTopic",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=StreamProcessorStorecloseApplication-xyz",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString());
try {
receiveAndValidateInputTopic();
// Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean,
"cleanupConfig", CleanupConfig.class);
assertThat(cleanup.cleanupOnStart()).isFalse();
assertThat(cleanup.cleanupOnStop()).isFalse();
}
finally {
context.close();
}
}
}
private void receiveAndValidateInputTopic() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("inputTopic");
template.sendDefault("{"id":"123"}");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer,
"inputTopic");
assertThat(cr.value().contains("Count for product with ID 123: 1")).isTrue();
}

在receiveAndValidateInputTopic()方法中,我正在发送由inputTopic预期的POJO对象,并通过执行itrue()检查预期输出,测试用例失败。getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);

Kstream Bean——这是我在github上得到的参考。我也在做类似的工作。

@EnableAutoConfiguration
public static class ProductCountApplication {
@Bean
public Function<KStream<Object, Product>, KStream<Integer, String>> process() {
return input -> input.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(new JsonSerde<>(Product.class),
new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
.count(Materialized.as("id-count-store")).toStream()
.map((key, value) -> new KeyValue<>(key.key().id,
"Count for product with ID 123: " + value));
}
}
static class Product {
Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}

我建议从测试类路径中删除Spring,如果您只关心测试Kafka组件,如本文档所示。更重要的是,这并不需要在内存中启动Kafka代理,所以它实际上是一个单元测试,而不是集成测试。

然后Spring的KStream函数简单地定义了一个子拓扑,它应该被提取到一个单独的函数中。

public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("input");
KStream output = functionToTest(input);  // refactored to call from Spring function
return builder.build();
}
// ... elsewhere
Function<KStream, KStream> process() -> 
return input -> functionToTest(input);
}

关于你现有的测试,使用Consumer和KafkaTemplate不能测试Kafka Streams的动作。

我使用了以下引用。它正在使用旧版本的依赖关系,但没关系。我把那些版本换成了最新的。并修改了测试制作人代码,因为当前的代码不适合我。修改代码如下。如果你正在使用Kafka Avro,这将非常有用。

为测试类

添加这些注释
@FunctionalSpringBootTest(classes = {<MainClass>Application.class},properties = {"spring.cloud.stream.kafka.binder.brokers=localhost:9092"})
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=PLAINTEXT://localhost:9092","port=9092","socket.request.max.bytes=409296129"},
topics = {"so0544in","so0544out"}, controlledShutdown = true)

测试生产者配置

private Producer<String, Pojoclass> configureProducer() {
var producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafka));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerProps.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, "true");
producerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "600000");
producerProps.put(SCHEMA_REGISTRY_URL_CONFIG, "mock://127.0.0.1:8081");
producerProps.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
producerProps.put(USER_INFO_CONFIG, "test:test");
return new DefaultKafkaProducerFactory<String, Pojoclass>(producerProps).createProducer();
}

在测试用例

中使用producer,如下所示
var producer = configureProducer();
Pojoclass pojoclass = TestUtility.getSample();
producer.send(new ProducerRecord<>(INPUT_TOPIC, "key-1", pojoclass));

最新更新