我们有一个需求,其中客户端正在调用我们的spring集成http入站网关之一,给API的输入是.csv格式的,并且一旦验证请求并发现正确,应立即发送状态为200 OK的响应。如果发生错误,则发送适当的错误消息。我们使用直接通道和执行通道的组合来进行异步处理。在使用spring boot父版本1.2.5时可以正常工作,但在升级到1.4.0版本时会失败。我们总是得到一个500内部服务器错误,从日志中发现的原因是MessageTimeoutException。
我们正在使用基于java的配置,配置如下。
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-http</artifactId>
<version>4.3.1.RELEASE</version>
</dependency>
@Configuration
public class ApplicationIntegrationConfig {
@Bean
public HttpRequestHandlingMessagingGateway httpMessageGateway(){
HttpRequestHandlingMessagingGateway gateway
= new HttpRequestHandlingMessagingGateway(Boolean.TRUE);
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setPathPatterns("/org/{orgId}/users");
requestMapping.setHeaders("Content-Type=text/csv");
gateway.setRequestMapping(requestMapping);
gateway.setRequestChannel(onBoardUserRequestChannel());
Map<String, Expression> customHeaderExpressions = new HashMap<>();
customHeaderExpressions.put("orgId", new SpelExpressionParser().
parseExpression("#pathVariables.orgId"));
gateway.setHeaderExpressions(customHeaderExpressions);
gateway.setErrorChannel(errorChannel());
gateway.setReplyTimeout(0);
return gateway;
}
@Bean
public MessageChannel processUserRequestChannel() {
DirectChannel channel =new DirectChannel();
channel.addInterceptor(new AuthenticationInterceptor());
return channel;
}
@Bean
public MessageChannel routeChannel() {
return new ExecutorChannel(Executors.newCachedThreadPool());
}
@Bean
public MessageChannel addUserChannel() {
return new ExecutorChannel(Executors.newCachedThreadPool());
}
@Bean
public MessageChannel removeUserChannel() {
return new ExecutorChannel(Executors.newCachedThreadPool());
}
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
}
分配器
@MessageEndpoint
public class PartnerUserOnBoardSplitter {
@Splitter(inputChannel= "processUserRequestChannel", outputChannel="routeChannel")
public List<UserDTO> split(Message message) throws ApplicationException {
List<UserDTO> userList = null;
try {
userList = validateAndCreateDTO(message);
}
} catch(Exception ex) {
throw new ApplicationException("<Message>");
}
return userList;
}
}
路由器@MessageEndpoint
public class CustomRouter {
@Router(inputChannel="routeChannel")
public String resolveRoute(UserDTO dto) {
return (Operation.ADD.equals(dto.getOperation())) ? "addUserChannel" : "removeUserChannel";
}
}
public class ServiceActivator{
@ServiceActivator(inputChannel = "addUserChannel")
public addUser(UserDto dto){
//process add
}
@ServiceActivator(inputChannel = "removeUserChannel")
public removeUser(UserDto dto){
//process remove
}
}
这是对Spring Integration 4.2 JIRA的增强/改进。
在此之前,如果预期的回复超时了,用户错误地得到200 OK。现在他得到了一个500的超时异常。
由于您将网关配置为期望回复,因此触发此功能。
简单地将网关配置为不期望回复…
HttpRequestHandlingMessagingGateway gateway
= new HttpRequestHandlingMessagingGateway(false);
您还将看到更快的响应,因为容器线程不会等待永远不会到来的响应(默认超时为1秒)。
编辑
如果您有时想发送回复,而不是在其他时间,将expectReply
设置为true并添加以下配置:
gateway.setStatusCodeExpression(new SpelExpressionParser().parseExpression("200"));
gateway.setReplyTimeout(0);
我添加了一些javadocs -希望有帮助。
对于那些使用XML配置的,它是…
<int-http:inbound-gateway request-channel="receiveChannel"
path="/receiveGateway"
reply-timeout="0"
reply-timeout-status-code-expression="200"
supported-methods="POST"/>