我有两个IntegrationFlows都从Apache Kafka接收消息
第一个IntegrationFlow在输入通道中,Consumer1(concurrency=4)读取topic_1
第二IntegrationFlow在输入通道中,Consumer2(concurrency=4)读取topic_2
但是这些两个IntegrationFlows,向输出通道发送消息,其中是一个公共类MyMessageHandler指定
:
@Bean
public IntegrationFlow sendFromQueueFlow1(MyMessageHandler message) {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory1, "topic_1")
.configureListenerContainer(configureListenerContainer_priority1)
)
.handle(message)
.get();
}
@Bean
public IntegrationFlow sendFromQueueFlow2(MyMessageHandler message) {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory2, "topic_2")
.configureListenerContainer(configureListenerContainer_priority2)
)
.handle(message)
.get();
}
<<p>类strong> MyMessageHandler 有方法发送(消息),此方法将消息进一步传递给另一个服务
class MyMessageHandler {
protected void handleMessageInternal(Message<?> message)
{
String postResponse = myService.send(message); // remote service calling
msgsStatisticsService.sendMessage(message, postResponse);
// *******
}
}
在每个IntegrationFlow中,4个消费者线程正在工作(总共8个线程),它们都去一个类MyMessageHandler,send()
会有什么问题呢?两个IntegrationFlow,当它们将消息传递给一个公共类时,它们会看到彼此吗?我需要在MyMessageHandler类中提供线程安全吗??我是否需要在send()方法前加上字synchronized???
但是如果我们制作第三个IntegrationFlow呢?
所以只有一个IntegrationFlow可以通过自己传递消息到MyMessageHandler类?那么它是线程安全的吗?例子:
@Bean
public IntegrationFlow sendFromQueueFlow1() {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory1, "topic_1")
.configureListenerContainer(configureListenerContainer_priority1)
)
.channel(**SOME_CHANNEL**())
.get();
}
@Bean
public IntegrationFlow sendFromQueueFlow2() {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory2, "topic_2")
.configureListenerContainer(configureListenerContainer_priority2)
)
.channel(**SOME_CHANNEL**())
.get();
}
@Bean
public MessageChannel **SOME_CHANNEL**() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
public IntegrationFlow sendALLFromQueueFlow(MyMessageHandler message) {
return IntegrationFlows
.from(**SOME_CHANNEL**())
.handle(message)
.get();
}
你需要让你的处理程序代码线程安全。
在整个方法中使用synchronized
,您将有效地禁用并发。
最好使用线程安全技术——不使用可变字段或使用有限的同步块,只在关键代码附近使用。