如何在单元测试中使用spring-cloud-stream的输出绑定?



我正在使用spring cloud stream来修改Kafka Topic,并使用".toTable()"到一张桌子旁。在应用程序中。我将设置输入和输出绑定。这在Kafka集群上工作得很好,但不适合我目前的测试设置。

@Configuration
public class ObjectTopology {
@Bean
public static Serde<Object> objSerde() {
return new ProtobufSerde<>(Object.parser());
}
@Bean
public Function<KStream<String, Object>, KTable<String, Object>> obj() {
return objKStream -> objKStream
.transform((TransformerSupplier<String, Object, KeyValue<String, Object>>) SomeTransformer::new)
.toTable();
}
}

application.yaml:

spring.cloud.stream.bindings:
obj-in-0:
destination: input-name
obj-out-0:
destination: output-name

如何访问totable生成的KTable呢?在下面的代码中?是否有一种方法可以在我的单元测试中使用spring-cloud-stream绑定?

ObjectTopology objectTopology = new ObjectTopology();
StreamsBuilder streamsBuilder = new StreamsBuilder();
Serde<String> keySerde = Serdes.String();
Serde<Object> valueSerde = objSerde();
KStream<String, Object> objKStream = streamsBuilder.stream("input-topic-name", Consumed.with(keySerde, valueSerde));
objectTopology.obj().apply(objKStream);
TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build());
TestInputTopic<String, Object> objTestInputTopic = topologyTestDriver.createInputTopic("input-topic-name", keySerde.serializer(), valueSerde.serializer());
KeyValueStore<String, Object> objStore = topologyTestDriver.getKeyValueStore("???"); // I would like to use the name defined by the output binding in application.yaml "output-name"
Object object = createObject();
objTestInputTopic.pipeInput("elem_0", object);
Object result = objStore.get("elem_0");
assertThat(result).isEqualTo(object);

toTable创建一个内部主题。您可能应该使用topologyTestDriver。createOutputTopic来读取它

对于命名,有https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#toTable--

最新更新