如何使用QueueChannel和ServiceActivator正确配置TCP inboundAdapter



我正在尝试配置一个TCP套接字,该套接字在不同的消息中接收格式为name,value的数据。这些信息平均每秒到达一次,有时更快,有时更慢。

我能够设置一个工作配置,但我对Spring Integration中实际发生的事情缺乏基本的了解。

我的配置文件如下所示:

@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService,
@Value("${tcp.socket.server.port}") final int port
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
)
.autoStartup(true)
.outputChannel(queueChannel())
).transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller()
{
return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
}
@Bean
public MessageChannel queueChannel()
{
return MessageChannels.queue("queue", 50).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}

CSVProcessingService看起来是这样的(缩写(:

@Slf4j
@Service
public class CSVProcessingService
{
@ServiceActivator
public void process(final String message)
{
log.debug("DATA RECEIVED: n" + message);
final CsvMapper csvMapper = new CsvMapper();
final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);
if (StringUtils.contains(message, StringUtils.LF))
{
processMultiLineInput(message, csvMapper, csvSchema);
}
else
{
processSingleLineInput(message, csvMapper, csvSchema);
}
}
}

我对此配置的目标如下:

  • 在配置的端口上接收消息
  • 承受更高的负载而不会丢失消息
  • 反序列化消息
  • 将它们放入队列通道
  • (最好也记录错误(
  • 每50ms轮询一次队列通道,并且将来自队列通道的消息传递给ObjectToStringTransformer
  • 在转换器之后,转换后的消息被传递给CSVProcessingService进行进一步处理

我是正确地实现了所有这些目标,还是因为误解了Spring Integration而犯了错误?有可能以某种方式将Poller@ServiceActivator组合起来吗?

此外,我在可视化我配置的IntegrationFlow实际上是如何"流动"的方面遇到了一个问题,也许有人可以帮助我更好地理解这一点。

编辑:

在Artems发表评论后,我重新设计了我的配置。现在看起来是这样的:

@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Value("${tcp.socket.server.port}") int port;
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
tcpNioServer()
)
.autoStartup(true)
.errorChannel(errorChannel())
)
.transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean
public AbstractServerConnectionFactory tcpNioServer()
{
return Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
.taskExecutor(
new ThreadPoolExecutor(0, 20,
30L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new DefaultThreadFactory("TCP-POOL"))
).get();
}
@Bean
public MessageChannel errorChannel()
{
return MessageChannels.direct("errors").get();
}
@Bean
public IntegrationFlow errorHandling()
{
return IntegrationFlows.from(errorChannel()).log(LoggingHandler.Level.DEBUG).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}

我还从CSVProcessingService#process方法中删除了@ServiceActivator注释。

不确定是什么让您感到困惑,但您的配置和逻辑看起来不错。

您可能会忽略这样一个事实,即您不需要介于两者之间的QueueChannel,因为AbstractConnectionFactory.processNioSelections()已经是多线程的,并且它会安排任务从套接字中读取消息。因此,您只需要为Tcp.nioServer()配置一个适当的Executor。尽管默认情况下它是Executors.newCachedThreadPool()

另一方面,使用内存中的QueueChannel,您确实可能会丢失消息,因为它们已经从网络中读取。

当您使用JavaDSL时,应该考虑在端点上使用poller()选项。如果您有inputChannel属性,@Poller将在@ServiceActivator上工作,但在handle()中使用相同的属性将覆盖该inputChannel,因此不会应用@Poller。不要把Java DSL和注释配置混为一谈!

在您的配置中,其他一切都很好。

最新更新