如何在异步任务执行器中启用请求作用域



在我的应用程序我有一些异步web服务。服务器接受请求,返回OK响应,并使用AsyncTaskExecutor开始处理请求。我的问题是如何在这里启用请求作用域,因为在此处理中,我需要获得由注释的类:

@Scope(value = WebApplicationContext.SCOPE_REQUEST, proxyMode = ScopedProxyMode.TARGET_CLASS)

现在得到exception:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.requestContextImpl': Scope 'request' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.

因为它运行在SimpleAsyncTaskExecutor而不是DispatcherServlet

my异步处理请求

taskExecutor.execute(new Runnable() {
    @Override
    public void run() {
        asyncRequest(request);
    }
});

where taskExecutor is:

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

我们遇到了同样的问题-需要在后台使用@Async执行代码,所以它无法使用任何Session-或RequestScope bean。我们用下面的方法解决了这个问题:

  • 创建一个自定义TaskPoolExecutor,该TaskPoolExecutor使用任务
  • 存储范围信息
  • 创建一个特殊的Callable(或Runnable),它使用这些信息来设置和清除后台线程的上下文
  • 创建覆盖配置以使用自定义执行器

注意:这只适用于Session和Request作用域的bean,而不适用于安全上下文(如Spring security)。你必须使用另一个方法来设置安全上下文,如果这是你所追求的。

Note2:为简洁起见,只显示Callable和submit()实现。您可以对Runnable和execute()执行相同的操作。

代码如下:

执行人:

public class ContextAwarePoolExecutor extends ThreadPoolTaskExecutor {
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
    }
    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        return super.submitListenable(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
    }
}

调用:

public class ContextAwareCallable<T> implements Callable<T> {
    private Callable<T> task;
    private RequestAttributes context;
    public ContextAwareCallable(Callable<T> task, RequestAttributes context) {
        this.task = task;
        this.context = context;
    }
    @Override
    public T call() throws Exception {
        if (context != null) {
            RequestContextHolder.setRequestAttributes(context);
        }
        try {
            return task.call();
        } finally {
            RequestContextHolder.resetRequestAttributes();
        }
    }
}
配置:

@Configuration
public class ExecutorConfig extends AsyncConfigurerSupport {
    @Override
    @Bean
    public Executor getAsyncExecutor() {
        return new ContextAwarePoolExecutor();
    }
}

最简单的方法是使用这样的任务装饰器:

static class ContextCopyingDecorator implements TaskDecorator {
    @Nonnull
    @Override
    public Runnable decorate(@Nonnull Runnable runnable) {
        RequestAttributes context =
                RequestContextHolder.currentRequestAttributes();
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return () -> {
            try {
                RequestContextHolder.setRequestAttributes(context);
                MDC.setContextMap(contextMap);
                runnable.run();
            } finally {
                MDC.clear();
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

要将这个装饰器添加到任务执行器,只需要在配置例程中添加它:

@Override
@Bean
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor poolExecutor = new ThreadPoolTaskExecutor();
    poolExecutor.setTaskDecorator(new ContextCopyingDecorator());
    poolExecutor.initialize();
    return poolExecutor;
}

不需要额外的持有者或自定义线程池任务执行器。


2021年的小更新:使用当前版本的Spring Boot,仅仅存在一个类型为TaskDecorator的bean就足够了。在创建上下文之后,任务装饰器将用于装饰Spring Boot创建的执行器。

前面提到的解决方案对我不起作用。解决方案不起作用的原因是,正如@Thilak的帖子中提到的,一旦原始父线程向客户端提交响应,请求对象可能会被垃圾收集。但通过对@Armadillo提供的解决方案进行一些调整,我能够让它工作。我使用的是spring boot 2.2

这是我所遵循的。

  • 创建一个自定义TaskPoolExecutor来存储(克隆后)作用域信息与任务。
  • 创建一个特殊的Callable(或Runnable)它使用克隆信息来设置当前上下文值为异步线程清除上下文。

执行器(与@Armadillo的帖子相同):

public class ContextAwarePoolExecutor extends ThreadPoolTaskExecutor {
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
    }
    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        return super.submitListenable(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
    }
}

调用:

public class ContextAwareCallable<T> implements Callable<T> {
  private Callable<T> task;
  private final RequestAttributes requestAttributes;
  public ContextAwareCallable(Callable<T> task, RequestAttributes requestAttributes) {
    this.task = task;
    this.requestAttributes = cloneRequestAttributes(requestAttributes);
  }
  @Override
  public T call() throws Exception {
    try {
      RequestContextHolder.setRequestAttributes(requestAttributes);
      return task.call();
    } finally {
        RequestContextHolder.resetRequestAttributes();
    }
  }
  private RequestAttributes cloneRequestAttributes(RequestAttributes requestAttributes){
    RequestAttributes clonedRequestAttribute = null;
    try{
      clonedRequestAttribute = new ServletRequestAttributes(((ServletRequestAttributes) requestAttributes).getRequest(), ((ServletRequestAttributes) requestAttributes).getResponse());
      if(requestAttributes.getAttributeNames(RequestAttributes.SCOPE_REQUEST).length>0){
        for(String name: requestAttributes.getAttributeNames(RequestAttributes.SCOPE_REQUEST)){
          clonedRequestAttribute.setAttribute(name,requestAttributes.getAttribute(name,RequestAttributes.SCOPE_REQUEST),RequestAttributes.SCOPE_REQUEST);
        }
      }
      if(requestAttributes.getAttributeNames(RequestAttributes.SCOPE_SESSION).length>0){
        for(String name: requestAttributes.getAttributeNames(RequestAttributes.SCOPE_SESSION)){
          clonedRequestAttribute.setAttribute(name,requestAttributes.getAttribute(name,RequestAttributes.SCOPE_SESSION),RequestAttributes.SCOPE_SESSION);
        }
      }
      if(requestAttributes.getAttributeNames(RequestAttributes.SCOPE_GLOBAL_SESSION).length>0){
        for(String name: requestAttributes.getAttributeNames(RequestAttributes.SCOPE_GLOBAL_SESSION)){
          clonedRequestAttribute.setAttribute(name,requestAttributes.getAttribute(name,RequestAttributes.SCOPE_GLOBAL_SESSION),RequestAttributes.SCOPE_GLOBAL_SESSION);
        }
      }
      return clonedRequestAttribute;
    }catch(Exception e){
      return requestAttributes;
    }
  }
}

我所做的更改是引入cloneRequestAttributes()来复制和设置RequestAttribute,以便即使在原始父线程向客户端提交响应之后,这些值仍然可用。

配置:因为有其他异步配置,我不希望这种行为适用于其他异步执行器,我已经创建了自己的任务执行器配置。

@Configuration
@EnableAsync
public class TaskExecutorConfig {
    @Bean(name = "contextAwareTaskExecutor")
    public TaskExecutor getContextAwareTaskExecutor() {
        ContextAwarePoolExecutor taskExecutor = new ConAwarePoolExecutor();
        taskExecutor.setMaxPoolSize(20);
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setQueueCapacity(100);
        taskExecutor.setThreadNamePrefix("ContextAwareExecutor-");
        return taskExecutor;
    }
}

最后在async方法上,我使用了执行者的名字

    @Async("contextAwareTaskExecutor")
    public void asyncMethod() {
    }

备选解决方案:

我们在尝试重用现有的组件类时遇到了这个麻烦。虽然这个解决方案看起来很方便。如果我们可以将相关的请求作用域值引用为方法参数,那么麻烦(克隆对象和保留线程池)就会少得多。在我们的示例中,我们计划以这样一种方式重构代码,即使用请求作用域bean并从异步方法中重用的组件类接受这些值作为方法参数。请求作用域bean从可重用组件中移除,并移动到调用其方法的组件类中。把我刚才描述的东西写进代码:

我们当前的状态是:

@Async("contextAwareTaskExecutor")
    public void asyncMethod() {
       reUsableCompoment.executeLogic() //This component uses the request scoped bean.
    }
重构代码:

    @Async("taskExecutor")
    public void asyncMethod(Object requestObject) {
       reUsableCompoment.executeLogic(requestObject); //Request scoped bean is removed from the component and moved to the component class which invokes it menthod.
    }

没有办法在子异步线程中获得请求作用域对象,因为原始的父请求处理线程可能已经向客户端提交了响应,并且所有请求对象都被销毁了。处理这种情况的一种方法是使用自定义作用域,如SimpleThreadScope。

SimpleThreadScope的一个问题是子线程不会继承父线程的作用域变量,因为它在内部使用简单的ThreadLocal。为了克服这个问题,实现一个与SimpleThreadScope完全相似的自定义作用域,但在内部使用InheritableThreadLocal。更多信息请点击此处Spring MVC:如何在派生线程中使用请求作用域bean ?

以上解决方案都不适用于我,因为在我的情况下,父线程响应请求返回给客户端,并且请求作用域对象不能在任何工作线程中引用。

我只是做了一个工作,使以上的事情工作。我正在使用Spring Boot 2.2,并使用customTaskExecutor与上面指定的ContextAwareCallable。

异步配置:

@Bean(name = "cachedThreadPoolExecutor")
public Executor cachedThreadPoolExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ContextAwarePoolExecutor();
    threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
    threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
    threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
    threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
    threadPoolTaskExecutor.setThreadNamePrefix("ThreadName-");
    threadPoolTaskExecutor.initialize();
    return threadPoolTaskExecutor;
}

ContextAwarePoolExecutor:

public class ContextAwarePoolExecutor extends ThreadPoolTaskExecutor {
   @Override
   public <T> Future<T> submit(Callable<T> task) {
      return super.submit(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
   }
   @Override
   public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
     return super.submitListenable(new ContextAwareCallable(task, 
     RequestContextHolder.currentRequestAttributes()));
   }

}

创建自定义上下文感知可调用对象:

 public class ContextAwareCallable<T> implements Callable<T> {
   private Callable<T> task;
   private CustomRequestScopeAttributes customRequestScopeAttributes;
   private static final String requestScopedBean = 
  "scopedTarget.requestScopeBeanName";
   public ContextAwareCallable(Callable<T> task, RequestAttributes context) {
    this.task = task;
    if (context != null) {
       //This is Custom class implements RequestAttributes class
        this.customRequestScopeAttributes = new CustomRequestScopeAttributes();
        //Add the request scoped bean to Custom class       
        customRequestScopeAttributes.setAttribute
        (requestScopedBean,context.getAttribute(requestScopedBean,0),0);
        //Set that in RequestContextHolder and set as Inheritable as true 
       //Inheritable is used for setting the attributes in diffrent ThreadLocal objects.
        RequestContextHolder.setRequestAttributes
           (customRequestScopeAttributes,true);
     }
 }
   @Override
   public T call() throws Exception {
   try {
      return task.call();
    } finally {
        customRequestScopeAttributes.removeAttribute(requestScopedBean,0);
    }
   }
}

自定义类:

public class CustomRequestScopeAttributes implements RequestAttributes { 
  private Map<String, Object> requestAttributeMap = new HashMap<>();
  @Override
  public Object getAttribute(String name, int scope) {
    if(scope== RequestAttributes.SCOPE_REQUEST) {
        return this.requestAttributeMap.get(name);
    }
    return null;
}
@Override
public void setAttribute(String name, Object value, int scope) {
    if(scope== RequestAttributes.SCOPE_REQUEST){
        this.requestAttributeMap.put(name, value);
    }
}
@Override
public void removeAttribute(String name, int scope) {
    if(scope== RequestAttributes.SCOPE_REQUEST) {
        this.requestAttributeMap.remove(name);
    }
}
@Override
public String[] getAttributeNames(int scope) {
    if(scope== RequestAttributes.SCOPE_REQUEST) {
        return this.requestAttributeMap.keySet().toArray(new String[0]);
    }
    return  new String[0];
 }
 //Override all methods in the RequestAttributes Interface.
}

最后在所需的方法中添加Async注释。

  @Async("cachedThreadPoolExecutor")    
  public void asyncMethod() {     
     anyService.execute() //This Service execution uses request scoped bean
  }

使用Spring-boot-2.0.3。REALEASE/spring-web-5.0.7,我已经提出了下面的代码为@Async工作

保存ThreadLocal上下文的类。

import java.util.Map;
public class ThreadContextHolder {
  private ThreadContextHolder() {}
  private static final ThreadLocal<Map<String, Object>> ctx = new ThreadLocal<>();
  public static Map<String, Object> getContext() {
    return ctx.get();
  }
  public static void setContext(Map<String, Object> attrs) {
    ctx.set(attrs);
  }
  public static void removeContext() {
    ctx.remove();
  }
}

异步配置:

      @Bean
      public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
       ...
       ...
        executor.setTaskDecorator(
            runnable -> {
              RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); // or currentRequestAttributes() if you want to fall back to JSF context.
              Map<String, Object> map =
                  Arrays.stream(requestAttributes.getAttributeNames(0))
                      .collect(Collectors.toMap(r -> r, r -> requestAttributes.getAttribute(r, 0)));
              return () -> {
                try {
                  ThreadContextHolder.setContext(map);
                  runnable.run();
                } finally {
                  ThreadContextHolder.removeContext();
                }
              };
            });
        executor.initialize();
        return executor;
      }

从async方法:

@Async
  public void asyncMethod() {
    logger.info("{}", ThreadContextHolder.getContext().get("key"));
  }

@Armadillo的回答激励我编写Runnable的实现。

TaskExecutor自定义实现:

/**
 * This custom ThreadPoolExecutor stores scoped/context information with the tasks.
 */
public class ContextAwareThreadPoolExecutor extends ThreadPoolTaskExecutor {
     @Override
    public Future<?> submit(Runnable task) {
        return super.submit(new ContextAwareRunnable(task, RequestContextHolder.currentRequestAttributes()));
    }
    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        return super.submitListenable(new ContextAwareRunnable(task, RequestContextHolder.currentRequestAttributes()));
    }
}

Runnable自定义实现:

/**
 * This custom Runnable class can use to make background threads context aware.
 * It store and clear the context for the background threads.
 */
public class ContextAwareRunnable implements Runnable {
    private Runnable task;
    private RequestAttributes context;
    public ContextAwareRunnable(Runnable task, RequestAttributes context) {
        this.task = task;
        // Keeps a reference to scoped/context information of parent thread.
        // So original parent thread should wait for the background threads. 
        // Otherwise you should clone context as @Arun A's answer
        this.context = context;
    }
    @Override
    public void run() {
        if (context != null) {
            RequestContextHolder.setRequestAttributes(context);
        }
        try {
            task.run();
        } finally {
            RequestContextHolder.resetRequestAttributes();
        }
    }
}

我解决了这个问题,添加了以下bean配置

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="request">
                <bean class="org.springframework.context.support.SimpleThreadScope"/>
            </entry>
        </map>
    </property>
</bean>

Update:上面的解决方案没有清理与spring文档中提到的线程相关的任何对象。这个方法适合我:https://www.springbyexample.org/examples/custom-thread-scope-module.html

@Armadillo

  1. 为我工作,谢谢。

  2. 至于Spring安全上下文,有更多的开箱即用的解决方案,它也为我工作(在这里找到如何设置Spring安全SecurityContextHolder策略?)

为了在子线程中使用SecurityContextHolder:

@Bean
public MethodInvokingFactoryBean methodInvokingFactoryBean() {
    MethodInvokingFactoryBean methodInvokingFactoryBean = new MethodInvokingFactoryBean();
    methodInvokingFactoryBean.setTargetClass(SecurityContextHolder.class);
    methodInvokingFactoryBean.setTargetMethod("setStrategyName");
    methodInvokingFactoryBean.setArguments(new String[]{SecurityContextHolder.MODE_INHERITABLETHREADLOCAL});
    return methodInvokingFactoryBean;
}

对于那些想在API中使用RequestScope和非阻塞I/O命令的人来说,这是一个相关的答案,而不是旋转驻留在原始HTTP请求之后的子线程。

SPRING ASYNC AWAIT请求范围

在Spring中实现一个自定义作用域是可能的,它将请求作用域对象存储在当前HttpServletRequest对象中,这样对象就可以在'await'语句之前和之后被访问:

  • 异步等待使用
  • 基于HttpServletRequest的RequestScope
  • 完整Java API代码示例

最新更新