Spring Cloud Stream(Kafka)参数化指定的错误通道{destination}.{group}.er



我正在尝试查看我传递给@ServiceActivator的错误通道是否可以通过引用YAML中指定的值进行绑定/参数化,而不是在代码本身中对实际目的地和使用者组进行硬编码。

@ServiceActivator(
// I do not want to hardcode destination and consumer group here
inputChannel = "stream-test-topic.my-consumer-group.errors"
)
public void handleError(ErrorMessage errorMessage) {
// Getting exception objects
Throwable errorMessagePayload = errorMessage.getPayload();
log.error("exception occurred", errorMessagePayload);
// Get message body
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (originalMessage != null) {
log.error("Message Body: {}", originalMessage.getPayload());
} else {
log.error("The message body is empty");
}
}

使用@ServiceActivator无法做到这一点;改为使用Java DSL:

@Value("${error.channel}")
String errors;
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(this.errors)
.handle(msg -> {
System.out.println(msg);
})
.get();
}

并设置

error:
channel: stream-test-topic.my-consumer-group.errors

相关内容

最新更新