我的春季云流应用程序中有一个简单的Kafka制作器。当我的 spring 应用程序启动时,我有一个@PostConstruct方法,它执行一些协调并尝试将事件发送到 Kafka 生产者。
问题是,当对帐开始将 enets 发送到其中时,我的 Kafka 生产者还没有准备好,导致以下结果:
org.springframework.messaging.MessageDeliveryException:Dispatcher 没有频道 'orderbook-service-1.orderbook' 的订阅者;嵌套的例外是 org.springframework.integration.MessageDispatchingException:Dispatcher 没有订阅者,failedMessage=GenericMessage .. at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77( at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445(
有没有办法在我的应用程序启动期间收到 Kafka 通道已初始化的通知,以便我只启动 rec 作业发布它。
这是我的代码片段:
public interface OrderEventChannel {
String TOPIC_BINDING = "orderbook";
@Output(TOPIC_BINDING)
SubscribableChannel outboundEvent();
}
@Configuration
@EnableBinding({OrderEventChannel.class})
@ConditionalOnExpression("${aix.core.stream.outgoing.kafka.enabled:false}")
public class OutgoingKafkaConfiguration {
}
@Service
public class OutgoingOrderKafkaProducer {
@Autowired
private OrderEventChannel orderEventChannel;
public void onOrderEvent( ClientEvent clientEvent ) {
try {
Message<KafkaEvent> kafkaMsg = mapToKafkaMessage( clientEvent );
SubscribableChannel subscribableChannel = orderEventChannel.outboundEvent();
subscribableChannel.send( kafkaMsg );
} catch ( RuntimeException rte ) {
log.error( "Error while publishing Kafka event [{}]", clientEvent, rte );
}
}
..
..
}
@PostConstruct
在上下文生命周期中开始使用bean还为时过早;它们仍在创建,配置和连接在一起。
您可以使用ApplicationListener
(或@EventListener
(来侦听ApplicationReadyEvent
(请务必将偶数的applicationContext
与主应用程序上下文进行比较,因为您可能会获得其他事件(。
你也可以实现SmartLifecycle
并将你的代码放在start()
中;把你的bean放在一个较晚的Phase
,这样它就可以在一切连接后启动。
输出绑定在第Integer.MIN_VALUE + 1000
阶段启动,输入绑定在第Integer.MAX_VALUE - 1000
阶段启动。
因此,如果您想在消息开始流动之前执行某些操作,请在这些消息之间使用一个阶段(例如 0,这是默认值(。