我有以下代码
@SpringBootApplication
public class EnrichmentProcessPOC {
public static void main(String[] args) {
SpringApplication.run(EnrichmentProcessPOC.class, args);
}
public static class EnrichmentProcessorApplication {
public static final String INPUT_TOPIC = "PAYMENT_MSG";
public static final String OUTPUT_TOPIC = "PAYMENT_MSG_CIF";
@Bean
public Function<KStream<Bytes, String>, KStream<Bytes, String>> process() {
return input -> input.mapValues(value -> "foo");
}
}
}
我的期望是,这应该用";foo";,但它所做的只是复制生产者主题上的消息。我在这里错过了什么,它为什么这么做?
我也尝试过使用Transformer,但它有相同的行为。将消息实际转换为"消息"的最简单方法是什么;foo"?
好吧,我是个白痴,原始代码运行良好,但当我启动Kafka控制台消费者和生产者时,我在同一主题上启动了它们(复制/粘贴错误(。
现在我有了关于不同主题的它们,我的代码就可以按预期运行了。
感谢大家的投入。
您能展示您的配置吗?这对我来说是意料之中的事:
@SpringBootApplication
public class So64158395Application {
public static void main(String[] args) {
SpringApplication.run(So64158395Application.class, args);
}
@Bean
public Function<KStream<byte[], byte[]>, KStream<byte[], byte[]>> process() {
return input -> input.map((key, value) -> KeyValue.pair(key, "foo".getBytes()));
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("in", "bar", "baz");
};
}
@KafkaListener(id = "outListener", topics = "out")
void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println(key + ":" + value);
}
}
spring.cloud.stream.bindings.process-in-0.destination=in
spring.cloud.stream.bindings.process-in-0.group=group
spring.cloud.stream.bindings.process-out-0.destination=out
结果:
bar:foo
您是否尝试过使用函数"mapValues";而不是地图?
根据文件,这是两者之间的区别
映射
将输入流的每个记录转换为输出流中的新记录>(键和值类型都可以任意更改(。
mapValues
将每个输入记录的值转换为输出记录的新值(可能有新的>类型(。