我正在尝试创建一个集成流程:
Connection A : [ ActiveMq (queue1) ---> TCP Server (1111) ](spring boot Application) ---> [ExternalApplication (client connected to Server running on port (1111))] (Application on different Technology (VB))
Connection B : [ ActiveMq (queue2) ---> TCP Server (2222) ](spring boot Application) ---> [ExternalApplication (client connected to Server running on port (2222))] (Application on different Technology (VB))
所以上面的流程描述了以下场景:
-
我有一个弹簧启动应用程序(比如springUtility(,它与其他一些技术上的外部应用程序(比如外部实用程序(连接。
-
springutility 和 externalUtility 之间的连接是通过 TCP 协议实现的。
-
两个实用程序之间将有多个连接,如上面的流程图连接 A 和连接 B 所示。
-
消息流将从 springUtilty 到外部实用程序(单向(
-
现在,外部实用程序可以在客户端模式或服务器模式下运行。 所以,如果外部实用程序在服务器模式下运行,那么我的 springutility 将处于客户端模式(完成(,也可以创建多个连接 我在循环代码中运行以下代码:
法典:
flow = IntegrationFlows
.from(Jms.inboundAdapter(ConnectionFactory())
.destination("rec"))
.channel(directChannel()).handle(Tcp.outboundAdapter(Tcp.nioClient("172.18.11.22",5555)
.serializer(customSerializer)
.deserializer(customSerializer)
.id(hostConnection.getConnectionNumber()).soTimeout(10000)))
.get();
flowRegistration = this.flowContext.registration(flow).id("in.flow").register();
问题:当外部实用程序在客户端模式下运行时,Spring 实用程序应该能够创建一个服务器,外部实用程序将在其上连接,然后它开始从 spring 实用程序接收消息: 我试图以错误的方式实施。
所以,最后我必须实现上面的连接作为测试用例来验证流程, 表示 Spring 实用程序已打开服务器端口(1111 和 2222(,外部应用程序与这两个端口连接并开始从队列 1 和队列 2 重复接收来自 Spring 实用程序的消息。两个连接都将有单独的流(一旦创建了单个流,那么我们就可以使用 for 循环来创建不需要的连接(。
法典:
flow = IntegrationFlows
.from(Jms.inboundAdapter(ConnectionFactory())
.destination("rec"))
.channel(directChannel()).handle(Tcp.outboundAdapter(Tcp.netServer(5555)
.serializer(customSerializer)
.deserializer(customSerializer)
.id(hostConnection.getConnectionNumber()).soTimeout(10000)))
.get();
flowRegistration = this.flowContext.registration(flow).id("in.flow").register();
' 法典:
2018-05-30 17:39:39.171 INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer : Adding {jms:outbound-channel-adapter} as a subscriber to the 'Conn1311out.flow.channel#0' channel
2018-05-30 17:39:39.172 INFO 15776 --- [nio-8080-exec-1] o.s.integration.channel.DirectChannel : Channel 'application.Conn1311out.flow.channel#0' has 1 subscriber(s).
2018-05-30 17:39:39.172 INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer : started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
2018-05-30 17:39:41.965 INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer : Adding {ip:tcp-outbound-channel-adapter} as a subscriber to the 'Conn1311in.flow.channel#0' channel
2018-05-30 17:39:41.965 INFO 15776 --- [nio-8080-exec-1] o.s.integration.channel.DirectChannel : Channel 'application.Conn1311in.flow.channel#0' has 1 subscriber(s).
2018-05-30 17:39:41.966 INFO 15776 --- [nio-8080-exec-1] .s.i.i.t.c.TcpNetServerConnectionFactory : started Conn1311, port=3333
2018-05-30 17:39:41.966 INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer : started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
2018-05-30 17:39:41.966 INFO 15776 --- [pool-1-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Conn1311, port=3333 No listener bound to server connection factory; will not read; exiting...
2018-05-30 17:39:41.971 INFO 15776 --- [nio-8080-exec-1] o.s.i.e.SourcePollingChannelAdapter : started org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
它首先在提供的端口上创建服务器,但退出并显示以下消息
No listener bound to server connection factory; will not read; exiting...
当我创建客户端时,它工作正常,但在创建服务器的情况下,它会给出错误
编辑:
我使用以下方法来创建动态不同的服务器,并从其各自的activemq客户端向连接的客户端发送消息
我已经创建了一个入站适配器,如下所示:
IntegrationFlow flow; CustomSerializer customSerializer = getCustomSerializer(String.valueOf(hostConnection.getMaxMessageLength()), hostConnection.getTerminatorChar()); TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); TcpNetServerConnectionFactory cf = new TcpNetServerConnectionFactory(1234)); cf.setSerializer(customSerializer); cf.setDeserializer(customSerializer); handler.setConnectionFactory(cf); flow = IntegrationFlows .from(Tcp.inboundAdapter(cf).id("adapter")).handle(handler) .get(); this.flowContext.registration(flow).id("inflow") .addBean(hostConnection.getConnectionNumber(),cf) .addBean(hostConnection.getConnectionNumber()+"handler",handler) .register();
我创建了一个 Jmsflow,其中来自队列的消息被发送到路由器,如下所示:
flow = IntegrationFlows .from(Jms.inboundAdapter(activeMQConnectionFactory) .destination(hostConnection.getConnectionNumber() +"rec")) .channel(directChannel()).route(new ServerRouter()) .get();
当客户端连接到我的服务器时,我从侦听器
handleTcpConnectionCloseEvent(TcpConnectionOpenEvent event)
创建一个流,并创建一个流:void createServerFlow(TcpConnectionOpenEvent event) { String connectionNumber = event.getConnectionFactoryName(); TcpNetConnection server = (TcpNetConnection) event.getSource(); TcpSendingMessageHandler handler = (TcpSendingMessageHandler) ac.getBean(connectionNumber + "handler"); IntegrationFlow flow = f -> f.enrichHeaders(e -> e.header(IpHeaders.CONNECTION_ID, event.getConnectionId())) .handle(handler); IntegrationFlowRegistration flowregister = this.flowContext.registration(flow).id("outclient").register(); MessageChannel channel = flowregister.getInputChannel(); this.subFlows.put(connectionNumber + "server", channel); }
我的服务器路由器找到频道并发送到目标频道
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { String serverChannel = (String) message.getHeaders().get("connectionnumber"); MessageChannel channel = HostConnectionRepository.subFlows .get(serverChannel+"server"); return Collections.singletonList(channel);
}
当我断开连接时,我从
step 3
中创建的 flowContext 中删除了流。
问题:当我删除流时,与流关联的 bean 也会被删除,因为它写在文档中,相关 bean 也会丢弃。
这删除了我的处理程序 bean,它关闭了 TCP 适配器和连接工厂,因此一旦断开连接,它就无法再次连接
如何在不丢弃处理程序 Bean 的情况下丢弃输出客户端流?
Tcp.outboundAdapter()
不是TcpListener
.它只是 TCP 套接字的发送方。这就是你得到这个例外的原因。
如果我们谈论TCP服务器,它必须是一个侦听器,一个入站端点。
完全不清楚为什么要在发送操作上动态创建 TCP 服务器......
你需要重新思考你的逻辑,并拥有这样一个TCP服务器来侦听这个动态逻辑之外。这里将仅使用客户端变体将 TCP 数据包发送到所需的 TCP 端口。
目前尚不清楚您要做什么; 客户端通常会启动连接,而不是为其他人提供要连接的端口。
即使有人连接到该端口,客户端也不知道要将消息发送到哪个套接字。所以你会在运行时得到错误。
没有侦听器绑定到服务器连接工厂;不会读取;正在退出...
该消息只是说没有向工厂注册以接收传入消息的组件。