我正在尝试配置一个TCP套接字,该套接字在不同的消息中接收格式为name,value
的数据。这些信息平均每秒到达一次,有时更快,有时更慢。
我能够设置一个工作配置,但我对Spring Integration中实际发生的事情缺乏基本的了解。
我的配置文件如下所示:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService,
@Value("${tcp.socket.server.port}") final int port
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
)
.autoStartup(true)
.outputChannel(queueChannel())
).transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller()
{
return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
}
@Bean
public MessageChannel queueChannel()
{
return MessageChannels.queue("queue", 50).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
CSVProcessingService
看起来是这样的(缩写(:
@Slf4j
@Service
public class CSVProcessingService
{
@ServiceActivator
public void process(final String message)
{
log.debug("DATA RECEIVED: n" + message);
final CsvMapper csvMapper = new CsvMapper();
final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);
if (StringUtils.contains(message, StringUtils.LF))
{
processMultiLineInput(message, csvMapper, csvSchema);
}
else
{
processSingleLineInput(message, csvMapper, csvSchema);
}
}
}
我对此配置的目标如下:
- 在配置的端口上接收消息
- 承受更高的负载而不会丢失消息
- 反序列化消息
- 将它们放入队列通道
- (最好也记录错误(
- 每50ms轮询一次队列通道,并且将来自队列通道的消息传递给
ObjectToStringTransformer
- 在转换器之后,转换后的消息被传递给
CSVProcessingService
进行进一步处理
我是正确地实现了所有这些目标,还是因为误解了Spring Integration而犯了错误?有可能以某种方式将Poller
和@ServiceActivator
组合起来吗?
此外,我在可视化我配置的IntegrationFlow实际上是如何"流动"的方面遇到了一个问题,也许有人可以帮助我更好地理解这一点。
编辑:
在Artems发表评论后,我重新设计了我的配置。现在看起来是这样的:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Value("${tcp.socket.server.port}") int port;
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
tcpNioServer()
)
.autoStartup(true)
.errorChannel(errorChannel())
)
.transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean
public AbstractServerConnectionFactory tcpNioServer()
{
return Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
.taskExecutor(
new ThreadPoolExecutor(0, 20,
30L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new DefaultThreadFactory("TCP-POOL"))
).get();
}
@Bean
public MessageChannel errorChannel()
{
return MessageChannels.direct("errors").get();
}
@Bean
public IntegrationFlow errorHandling()
{
return IntegrationFlows.from(errorChannel()).log(LoggingHandler.Level.DEBUG).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
我还从CSVProcessingService#process
方法中删除了@ServiceActivator
注释。
不确定是什么让您感到困惑,但您的配置和逻辑看起来不错。
您可能会忽略这样一个事实,即您不需要介于两者之间的QueueChannel
,因为AbstractConnectionFactory.processNioSelections()
已经是多线程的,并且它会安排任务从套接字中读取消息。因此,您只需要为Tcp.nioServer()
配置一个适当的Executor
。尽管默认情况下它是Executors.newCachedThreadPool()
。
另一方面,使用内存中的QueueChannel
,您确实可能会丢失消息,因为它们已经从网络中读取。
当您使用JavaDSL时,应该考虑在端点上使用poller()
选项。如果您有inputChannel
属性,@Poller
将在@ServiceActivator
上工作,但在handle()
中使用相同的属性将覆盖该inputChannel
,因此不会应用@Poller
。不要把Java DSL和注释配置混为一谈!
在您的配置中,其他一切都很好。