Spring集成——在后台使用http请求处理消息网关异步处理发送即时响应



我们有一个需求,其中客户端正在调用我们的spring集成http入站网关之一,给API的输入是.csv格式的,并且一旦验证请求并发现正确,应立即发送状态为200 OK的响应。如果发生错误,则发送适当的错误消息。我们使用直接通道和执行通道的组合来进行异步处理。在使用spring boot父版本1.2.5时可以正常工作,但在升级到1.4.0版本时会失败。我们总是得到一个500内部服务器错误,从日志中发现的原因是MessageTimeoutException。

我们正在使用基于java的配置,配置如下。

pom.xml

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.0.RELEASE</version>
    </parent>
            <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
            <version>4.3.1.RELEASE</version>
        </dependency>
@Configuration
public class ApplicationIntegrationConfig {    
    @Bean
    public HttpRequestHandlingMessagingGateway httpMessageGateway(){
        HttpRequestHandlingMessagingGateway gateway
                = new HttpRequestHandlingMessagingGateway(Boolean.TRUE);
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setPathPatterns("/org/{orgId}/users");
        requestMapping.setHeaders("Content-Type=text/csv");
        gateway.setRequestMapping(requestMapping);
        gateway.setRequestChannel(onBoardUserRequestChannel());
        Map<String, Expression> customHeaderExpressions = new HashMap<>();
        customHeaderExpressions.put("orgId", new SpelExpressionParser().
                parseExpression("#pathVariables.orgId"));
        gateway.setHeaderExpressions(customHeaderExpressions);
        gateway.setErrorChannel(errorChannel());
        gateway.setReplyTimeout(0);
        return gateway;
    }    
    @Bean
    public MessageChannel processUserRequestChannel() {
        DirectChannel channel =new DirectChannel();
        channel.addInterceptor(new AuthenticationInterceptor());
        return channel;
    }
    @Bean
    public MessageChannel routeChannel() {
        return new ExecutorChannel(Executors.newCachedThreadPool());
    }
    @Bean
    public MessageChannel addUserChannel() {
        return new ExecutorChannel(Executors.newCachedThreadPool());
    }
    @Bean
    public MessageChannel removeUserChannel() {
        return new ExecutorChannel(Executors.newCachedThreadPool());
    }
    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }    
}

分配器

@MessageEndpoint
public class PartnerUserOnBoardSplitter {     
    @Splitter(inputChannel= "processUserRequestChannel", outputChannel="routeChannel")
    public List<UserDTO> split(Message message) throws ApplicationException {
        List<UserDTO> userList = null;
        try {
            userList = validateAndCreateDTO(message);
        } 
        } catch(Exception ex) {
            throw new ApplicationException("<Message>");
        }
        return userList;
    }    
}
路由器

@MessageEndpoint
public class CustomRouter {
    @Router(inputChannel="routeChannel")
    public String resolveRoute(UserDTO dto) {
        return (Operation.ADD.equals(dto.getOperation())) ? "addUserChannel" : "removeUserChannel";
    }    
}
public class ServiceActivator{
@ServiceActivator(inputChannel = "addUserChannel")
public addUser(UserDto dto){
//process add
}
@ServiceActivator(inputChannel = "removeUserChannel")
public removeUser(UserDto dto){
//process remove
}    
}

这是对Spring Integration 4.2 JIRA的增强/改进。

在此之前,如果预期的回复超时了,用户错误地得到200 OK。现在他得到了一个500的超时异常。

由于您将网关配置为期望回复,因此触发此功能。

简单地将网关配置为不期望回复…

HttpRequestHandlingMessagingGateway gateway
            = new HttpRequestHandlingMessagingGateway(false);

您还将看到更快的响应,因为容器线程不会等待永远不会到来的响应(默认超时为1秒)。

编辑

如果您有时想发送回复,而不是在其他时间,将expectReply设置为true并添加以下配置:

gateway.setStatusCodeExpression(new SpelExpressionParser().parseExpression("200"));
gateway.setReplyTimeout(0);

我添加了一些javadocs -希望有帮助。

对于那些使用XML配置的,它是…

<int-http:inbound-gateway request-channel="receiveChannel"
                      path="/receiveGateway"
                      reply-timeout="0"
                      reply-timeout-status-code-expression="200"
                      supported-methods="POST"/>

最新更新