弹簧集成在分裂大流时背压误差



目标是从Web服务器直接直接直接下载一个大型JSON.GZ文件(4 GB压缩,大约12 GB未压缩,1200万行)到数据库。由于Spring Integration出站网关不支持GZIP格式,因此我自己使用OKHTTP自动进行响应:

自动进行操作:
body = response.body().byteStream(); // thanks okhttp
reader = new InputStreamReader(body, StandardCharsets.UTF_8);
br = new BufferedReader(reader, bufferSize);
Flux<String> flux = Flux.fromStream(br.lines())
    .onBackpressureBuffer(10000, x -> log.error("Buffer overrun!"))
    .doAfterTerminate(() -> closeQuietly(closeables))
    .doOnError(t -> log.error(...))

在集成流中:

.handle(new MessageTransformingHandler(new GzipToFluxTransformer(...)))
.split()
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.channel(repositoryInputChannel())

但是

2017-12-08 22:48:47.846 [task-scheduler-7] [ERROR] c.n.d.y.s.GzipToFluxTransformer - Buffer overrun!
2017-12-08 22:48:48.337 [task-scheduler-7] [ERROR] o.s.i.h.LoggingHandler - org.springframework.messaging.MessageHandlingException: 
error occurred in message handler [org.springframework.integration.splitter.DefaultMessageSplitter#1]; 
nested exception is reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...), 
failedMessage=...}]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)

使用桥梁轮询的无界队列在运行时连接输出通道。这是为了促进测试,使队列可以用DirectChannel代替用于测试。

@Bean(name = "${...}")
public PollableChannel streamingOutputChannel() {
    return new QueueChannel();
}
@Bean
public IntegrationFlow srcToSinkBridge() {
    return IntegrationFlows.from(streamingOutputChannel())
        .bridge(e -> e.poller(Pollers.fixedDelay(500)))
        .channel(repositoryInputChannel())
        .get();
}

我在这里有几个疑问。

  1. 我不确定使用bean名称中使用SPEL的动态绑定是否有效,但我不知道如何验证它。
  2. 由于队列是无限的,所以我能想到的是,民意调查还不够快。但是,例外表明分离器正在遇到问题。

问题是log语句!它使用窃听将分离器的输出通道更改为DirectChannel,该频率使AbstractMessageSplitter.

弄乱了逻辑
boolean reactive = getOutputChannel() instanceof ReactiveStreamsSubscribableChannel;

引用文档:

从版本5.0开始,...如果分离器的输出频道是一个 ReactiveStreamsSubScribableChannel的实例, AbstractMessagesPlitter会产生磁通结果,而不是迭代器 并将输出通道订阅此通量以进行后压 基于下游流量需求。

工作代码如下 - 只需将日志语句从分离器之后的立即移动到末端,固定了背压问题:

IntegrationFlows.from(inputChannel)
.filter(Message.class, msg -> msg.getHeaders().containsKey(FILE_TYPE_HEADER))
.handle(new GzipToFluxTransformer(...))
.transform((Flux<String> payload) -> payload
        .onBackpressureBuffer(getOnBackpressureBufferSize(),
                s -> log.error("Buffer overrun!")))
.split()
.channel(c -> c.flux(outputChannel))
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.get();

我已经在Spring Integration Github上打开了第2302期。

最新更新