这是我的问题如何在Spring Integration DSL中实现简单的echo套接字服务的变体。引入了一个很好的工作解决方案,但我想探索替代方案。我特别感兴趣的是基于在客户端和服务器实现中显式使用入站和出站通道的解决方案。这可能吗?
到目前为止,我能够想出:
心跳客户端配置
...
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory,
MessageChannel outboundChannel,
PollableChannel inboundChannel) {
return IntegrationFlows
.from(outboundChannel)
.handle(Tcp.outboundGateway(clientConnectionFactory))
.channel(inboundChannel)
.get();
}
...
心跳客户端
public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
...
void run() {
// ..in scheduled intervals in loop
outboudChannel.send(new GenericMessage<String>("status"));
Message<?> message = inboundChannel.receive(1000);
}
客户端部分似乎工作正常。问题出在服务器端。
心跳服务器
public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
...
void run() {
// ..in some kind of loop
Message<?> message = inboundChannel.receive(1000); // presumably a blocking call
...
outboudChannel.send(new GenericMessage<>("OK"));
...
}
HeartbeatServerConfig
这是最棘手的部分,我确信我错了。我只是不知道我应该怎么做。在这里,我天真地使用客户端实现中的反向方法,它似乎正在工作;在流定义中切换入站和出站通道的意义上相反。
...
@Bean
public IntegrationFlow heartbeatServerFlow(
MessageChannel outboundChannel,
PollableChannel inboundChannel) {
return IntegrationFlows
.from(inboundChannel)
.handle(Tcp.inboundGateway(Tcp.netServer(7777)))
.channel(outboundChannel)
.get();
}
...
服务器不工作,抛出关于Found ambiguous parameter type [class java.lang.Boolean] for method match ...
的神秘异常,后跟一长串 Spring 和 Spring 集成方法。
完整的源代码可以在这里找到。
不能使用通道启动服务器端流。
流从网关开始;它处理所有套接字通信。当 收到消息时,它会将其发送到通道。
你可以这样做...
@Bean
public IntegrationFlow server(PollableChannel requests, MessageChannel replies) {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234))
.replyChannel(replies))
.transform(Transformers.objectToString())
.channel(requests)
.get();
}
但我会问你为什么要这样做,因为现在你必须管理自己的线程才能从请求通道接收并写入回复通道。为此,必须将请求消息中的replyChannel
标头复制到回复消息中。事实上,你真的不需要回复渠道;您可以直接将回复发送到replyChannel
标头(这就是内部发生的情况,我们将回复通道桥接到标头通道)。
在网关线程上处理请求要简单得多。
只是为了补充Gary的完美答案,如果有人感兴趣,这里是完整的代码。
我必须明确指定TcpNetServerConnectionFactory
,以ByteArrayLengthHeaderSerializer
设置为序列化程序/反序列化程序。没有它它就不起作用。
心跳服务器配置完整代码
@Bean
public TcpNetServerConnectionFactory connectionFactory() {
TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
connectionFactory.setSerializer(new ByteArrayLengthHeaderSerializer());
connectionFactory.setDeserializer(new ByteArrayLengthHeaderSerializer());
return connectionFactory;
}
@Bean
public IntegrationFlow heartbeatServerFlow(
TcpNetServerConnectionFactory connectionFactory,
PollableChannel inboundChannel,
MessageChannel outboundChannel) {
return IntegrationFlows.from(Tcp.inboundGateway(connectionFactory)
.replyChannel(outboundChannel))
.channel(inboundChannel)
.get();
}
心跳服务器完整代码
public void start() {
Executors.newSingleThreadExecutor().execute(() -> {
while (true) {
try {
Message<?> request = inboundChannel.receive();
if (request == null) {
log.error("Heartbeat timeouted");
} else {
MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel();
String requestPayload = new String((byte[]) request.getPayload());
if (requestPayload.equals("status")) {
log.info("Heartbeat received");
outboudChannel.send(new GenericMessage<>("OK"));
} else {
log.error("Unexpected message content from client: " + requestPayload);
}
}
} catch (Exception e) {
log.error(e);
}
}
});
}
当然,关键的一点是从请求消息本身获取出站通道,如下所示:MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel()
完整的代码可以在这里找到。