春季云流 - 初始化 Kafka 绑定器时的通知



我的春季云流应用程序中有一个简单的Kafka制作器。当我的 spring 应用程序启动时,我有一个@PostConstruct方法,它执行一些协调并尝试将事件发送到 Kafka 生产者。

问题是,当对帐开始将 enets 发送到其中时,我的 Kafka 生产者还没有准备好,导致以下结果:

org.springframework.messaging.MessageDeliveryException:Dispatcher 没有频道 'orderbook-service-1.orderbook' 的订阅者;嵌套的例外是 org.springframework.integration.MessageDispatchingException:Dispatcher 没有订阅者,failedMessage=GenericMessage .. at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77( at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445(

有没有办法在我的应用程序启动期间收到 Kafka 通道已初始化的通知,以便我只启动 rec 作业发布它。

这是我的代码片段:

public interface OrderEventChannel {
String TOPIC_BINDING = "orderbook";
@Output(TOPIC_BINDING)
SubscribableChannel outboundEvent();
}
@Configuration
@EnableBinding({OrderEventChannel.class})
@ConditionalOnExpression("${aix.core.stream.outgoing.kafka.enabled:false}")
public class OutgoingKafkaConfiguration {
}
@Service
public class OutgoingOrderKafkaProducer {
@Autowired
private OrderEventChannel orderEventChannel;
public void onOrderEvent( ClientEvent clientEvent ) {
try {
Message<KafkaEvent> kafkaMsg = mapToKafkaMessage( clientEvent );
SubscribableChannel subscribableChannel = orderEventChannel.outboundEvent();
subscribableChannel.send( kafkaMsg );
} catch ( RuntimeException rte ) {
log.error( "Error while publishing Kafka event [{}]", clientEvent, rte );
}
}
..
..
}
@PostConstruct

在上下文生命周期中开始使用bean还为时过早;它们仍在创建,配置和连接在一起。

您可以使用ApplicationListener(或@EventListener(来侦听ApplicationReadyEvent(请务必将偶数的applicationContext与主应用程序上下文进行比较,因为您可能会获得其他事件(。

你也可以实现SmartLifecycle并将你的代码放在start()中;把你的bean放在一个较晚的Phase,这样它就可以在一切连接后启动。

输出绑定在第Integer.MIN_VALUE + 1000阶段启动,输入绑定在第Integer.MAX_VALUE - 1000阶段启动。

因此,如果您想在消息开始流动之前执行某些操作,请在这些消息之间使用一个阶段(例如 0,这是默认值(。

最新更新