配置动态TCP服务器以使用Java DSL将消息发送到客户端连接的Spring Integration



我正在尝试创建一个集成流程:

Connection A : [ ActiveMq (queue1) ---> TCP Server (1111) ](spring boot Application) ---> [ExternalApplication (client connected to Server running on port (1111))] (Application on different Technology (VB))

Connection B : [ ActiveMq (queue2) ---> TCP Server (2222) ](spring boot Application) ---> [ExternalApplication (client connected to Server running on port (2222))] (Application on different Technology (VB))

所以上面的流程描述了以下场景:

  1. 我有一个弹簧启动应用程序(比如springUtility(,它与其他一些技术上的外部应用程序(比如外部实用程序(连接。

  2. springutility 和 externalUtility 之间的连接是通过 TCP 协议实现的。

  3. 两个实用程序之间将有多个连接,如上面的流程图连接 A 和连接 B 所示。

  4. 消息流将从 springUtilty 到外部实用程序(单向(

  5. 现在,外部实用程序可以在客户端模式或服务器模式下运行。 所以,如果外部实用程序在服务器模式下运行,那么我的 springutility 将处于客户端模式(完成(,也可以创建多个连接 我在循环代码中运行以下代码:

法典:

flow = IntegrationFlows
.from(Jms.inboundAdapter(ConnectionFactory())
.destination("rec"))
.channel(directChannel()).handle(Tcp.outboundAdapter(Tcp.nioClient("172.18.11.22",5555)
.serializer(customSerializer)
.deserializer(customSerializer)
.id(hostConnection.getConnectionNumber()).soTimeout(10000)))
.get();
flowRegistration = this.flowContext.registration(flow).id("in.flow").register();
  1. 问题:当外部实用程序在客户端模式下运行时,Spring 实用程序应该能够创建一个服务器,外部实用程序将在其上连接,然后它开始从 spring 实用程序接收消息: 我试图以错误的方式实施。

  2. 所以,最后我必须实现上面的连接作为测试用例来验证流程, 表示 Spring 实用程序已打开服务器端口(1111 和 2222(,外部应用程序与这两个端口连接并开始从队列 1 和队列 2 重复接收来自 Spring 实用程序的消息。两个连接都将有单独的流(一旦创建了单个流,那么我们就可以使用 for 循环来创建不需要的连接(。

法典:

flow = IntegrationFlows
.from(Jms.inboundAdapter(ConnectionFactory())
.destination("rec"))
.channel(directChannel()).handle(Tcp.outboundAdapter(Tcp.netServer(5555)
.serializer(customSerializer)
.deserializer(customSerializer)
.id(hostConnection.getConnectionNumber()).soTimeout(10000)))
.get();
flowRegistration = this.flowContext.registration(flow).id("in.flow").register();

' 法典:

2018-05-30 17:39:39.171  INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer       : Adding {jms:outbound-channel-adapter} as a subscriber to the 'Conn1311out.flow.channel#0' channel
2018-05-30 17:39:39.172  INFO 15776 --- [nio-8080-exec-1] o.s.integration.channel.DirectChannel    : Channel 'application.Conn1311out.flow.channel#0' has 1 subscriber(s).
2018-05-30 17:39:39.172  INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
2018-05-30 17:39:41.965  INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer       : Adding {ip:tcp-outbound-channel-adapter} as a subscriber to the 'Conn1311in.flow.channel#0' channel
2018-05-30 17:39:41.965  INFO 15776 --- [nio-8080-exec-1] o.s.integration.channel.DirectChannel    : Channel 'application.Conn1311in.flow.channel#0' has 1 subscriber(s).
2018-05-30 17:39:41.966  INFO 15776 --- [nio-8080-exec-1] .s.i.i.t.c.TcpNetServerConnectionFactory : started Conn1311, port=3333
2018-05-30 17:39:41.966  INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
2018-05-30 17:39:41.966  INFO 15776 --- [pool-1-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Conn1311, port=3333 No listener bound to server connection factory; will not read; exiting...
2018-05-30 17:39:41.971  INFO 15776 --- [nio-8080-exec-1] o.s.i.e.SourcePollingChannelAdapter      : started org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0

它首先在提供的端口上创建服务器,但退出并显示以下消息

No listener bound to server connection factory; will not read; exiting...

当我创建客户端时,它工作正常,但在创建服务器的情况下,它会给出错误

编辑:

我使用以下方法来创建动态不同的服务器,并从其各自的activemq客户端向连接的客户端发送消息

  1. 我已经创建了一个入站适配器,如下所示:

    IntegrationFlow flow;
    CustomSerializer customSerializer = getCustomSerializer(String.valueOf(hostConnection.getMaxMessageLength()),
    hostConnection.getTerminatorChar());
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
    TcpNetServerConnectionFactory cf = new TcpNetServerConnectionFactory(1234));
    cf.setSerializer(customSerializer);
    cf.setDeserializer(customSerializer);
    handler.setConnectionFactory(cf);
    flow = IntegrationFlows
    .from(Tcp.inboundAdapter(cf).id("adapter")).handle(handler)
    .get();
    this.flowContext.registration(flow).id("inflow")
    .addBean(hostConnection.getConnectionNumber(),cf)
    .addBean(hostConnection.getConnectionNumber()+"handler",handler)
    .register();
    
  2. 我创建了一个 Jmsflow,其中来自队列的消息被发送到路由器,如下所示:

    flow = IntegrationFlows
    .from(Jms.inboundAdapter(activeMQConnectionFactory)
    .destination(hostConnection.getConnectionNumber() +"rec")) 
    .channel(directChannel()).route(new ServerRouter())
    .get(); 
    
  3. 当客户端连接到我的服务器时,我从侦听器handleTcpConnectionCloseEvent(TcpConnectionOpenEvent event)创建一个流,并创建一个流:

    void createServerFlow(TcpConnectionOpenEvent event) {
    String connectionNumber = event.getConnectionFactoryName();
    TcpNetConnection server = (TcpNetConnection) event.getSource();
    TcpSendingMessageHandler handler = (TcpSendingMessageHandler) ac.getBean(connectionNumber + "handler");
    IntegrationFlow flow = f -> f.enrichHeaders(e -> e.header(IpHeaders.CONNECTION_ID, event.getConnectionId()))
    .handle(handler);
    IntegrationFlowRegistration flowregister = this.flowContext.registration(flow).id("outclient").register();
    MessageChannel channel = flowregister.getInputChannel();
    this.subFlows.put(connectionNumber + "server", channel);
    }
    
  4. 我的服务器路由器找到频道并发送到目标频道

    protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
    String serverChannel = (String) message.getHeaders().get("connectionnumber");
    MessageChannel channel = HostConnectionRepository.subFlows
    .get(serverChannel+"server");
    return Collections.singletonList(channel);
    

    }

  5. 当我断开连接时,我从step 3中创建的 flowContext 中删除了流。

问题:当我删除流时,与流关联的 bean 也会被删除,因为它写在文档中,相关 bean 也会丢弃。

这删除了我的处理程序 bean,它关闭了 TCP 适配器和连接工厂,因此一旦断开连接,它就无法再次连接

如何在不丢弃处理程序 Bean 的情况下丢弃输出客户端流?

Tcp.outboundAdapter()不是TcpListener.它只是 TCP 套接字的发送方。这就是你得到这个例外的原因。

如果我们谈论TCP服务器,它必须是一个侦听器,一个入站端点。

完全不清楚为什么要在发送操作上动态创建 TCP 服务器......

你需要重新思考你的逻辑,并拥有这样一个TCP服务器来侦听这个动态逻辑之外。这里将仅使用客户端变体将 TCP 数据包发送到所需的 TCP 端口。

目前尚不清楚您要做什么; 客户端通常会启动连接,而不是为其他人提供要连接的端口。

即使有人连接到该端口,客户端也不知道要将消息发送到哪个套接字。所以你会在运行时得到错误。

没有侦听器绑定到服务器连接工厂;不会读取;正在退出...

该消息只是说没有向工厂注册以接收传入消息的组件。

相关内容

  • 没有找到相关文章

最新更新