lambda中的Spring Request作用域bean



我有一个spring应用程序,它注入基于请求上下文的injexted中的某些bean。在这个例子中,它是Facebook bean。

@RestController
@RequestMapping("facebook")
public class FacebookInjectionController {
@Autowired
private Facebook facebook;
@Autowired
private UserRepository userRepository;
@RequestMapping(method = RequestMethod.GET)
public List<String> blah() {
String firstName = facebook.userOperations().getUserProfile().getFirstName();
return Arrays.asList(firstName);
}
@RequestMapping(method = RequestMethod.GET, value = "complex")
public List<String> blah2() {
UserJwt principal = (UserJwt) SecurityContextHolder.getContext().getAuthentication().getPrincipal();
Stream<User> stream = StreamSupport.stream(userRepository.findAll().spliterator(), true);
return stream.filter(u -> u.getUid().equals(principal.getUid()))
.map(u ->
facebook.userOperations().getUserProfile().getFirstName()
).collect(Collectors.toList());
}
}

此代码将正常运行,但每隔一段时间就会失败,并出现以下错误:

2017-02-09 01:39:59.133错误40802--[o-auto-1-exec-2]o.a.c.c.[.[./].[dispatcherServlet]:的Servlet.service()路径为[]的上下文中的servlet[dispatcherServlet]引发异常[请求处理失败;嵌套异常为org.springframework.beans.factory.BeanCreationException:错误正在创建名为"scopedTarget.facebook"的bean:Scope"request"为对于当前线程不活动;考虑定义作用域代理对于这个bean,如果您打算从singleton引用它;嵌套的异常为java.lang.IollegalStateException:没有线程绑定请求found:您引用的请求属性是否在实际的web请求,或处理原始请求之外的请求接收线程?如果您实际是在web请求中操作并且仍然收到此消息,则您的代码可能正在外部运行的DispatcherServlet/DispatcherPortlet:在这种情况下,使用RequestContextListener或RequestContextFilter来公开当前请求。]根本原因

java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131)
at org.springframework.web.context.request.AbstractRequestAttributesScope.get(AbstractRequestAttributesScope.java:41)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:340)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:187)
at com.sun.proxy.$Proxy137.userOperations(Unknown Source)
at com.roomsync.FacebookInjectionController.lambda$blah2$5(FacebookInjectionController.java:43)
at com.roomsync.FacebookInjectionController$$Lambda$10/2024009478.apply(Unknown Source)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
at java.util.stream.AbstractTask.compute(AbstractTask.java:316)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:902)
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1689)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

我尝试过多种解决方案(包括SpringMVC:如何在派生线程中使用请求范围的bean?),但都没有成功。

有没有一种方法可以将请求范围的bean传递给lambda或其他线程?

做什么https://stackoverflow.com/users/1262865/john16384说我已将配置更改为:

@Bean
@Scope(value = "inheritableThreadScope", proxyMode = ScopedProxyMode.INTERFACES)
public ConnectionRepository connectionRepository(ConnectionFactoryLocator connectionFactoryLocator) {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication == null) {
throw new IllegalStateException("Unable to get a ConnectionRepository: no user signed in");
}
return getUsersConnectionRepository(connectionFactoryLocator).createConnectionRepository(authentication.getName());
}
@Bean
@Scope(value="inheritableThreadScope", proxyMode=ScopedProxyMode.INTERFACES)
public Facebook facebook(ConnectionFactoryLocator connectionFactoryLocator) {
Connection<Facebook> connection = connectionRepository(connectionFactoryLocator).findPrimaryConnection(Facebook.class);
return connection != null ? connection.getApi() : null;
}
@Bean
@Scope(value = "inheritableThreadScope", proxyMode = ScopedProxyMode.INTERFACES)
public ExecutorService fbExecutor () {
return Executors.newSingleThreadExecutor();
}

控制器现在看起来像:

@RestController
@RequestMapping("facebook")
public class FacebookInjectionController {
@Autowired
private Facebook facebook;
@Autowired
private UserRepository userRepository;
@Autowired
private ExecutorService fbExecutor;
@RequestMapping(method = RequestMethod.GET)
public List<String> blah() {
String firstName = facebook.userOperations().getUserProfile().getFirstName();
return Arrays.asList(firstName);
}
@RequestMapping(method = RequestMethod.GET, value = "complex")
public List<String> blah2() throws ExecutionException, InterruptedException {
UserJwt principal = (UserJwt) SecurityContextHolder.getContext().getAuthentication().getPrincipal();
Stream<User> stream = StreamSupport.stream(userRepository.findAll().spliterator(), true);
Future<List<String>> submit = fbExecutor.submit(() -> stream.filter(u -> u.getUid().equals(principal.getUid()))
.map(u ->
facebook.userOperations().getUserProfile().getFirstName()
)
.collect(Collectors.toList()));
return submit.get();
}
}

我还有以下配置:

@Configuration
public class BeanFactoryConfig implements BeanFactoryAware {
private static final Logger LOGGER = Logger.getLogger(BeanFactoryConfig.class);
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
if (beanFactory instanceof ConfigurableBeanFactory) {
//            logger.info("MainConfig is backed by a ConfigurableBeanFactory");
ConfigurableBeanFactory cbf = (ConfigurableBeanFactory) beanFactory;
/*Notice:
*org.springframework.beans.factory.config.Scope
* !=
*org.springframework.context.annotation.Scope
*/
org.springframework.beans.factory.config.Scope simpleThreadScope = new SimpleThreadScope() {
@Override
public void registerDestructionCallback(String name, Runnable callback) {
RequestAttributes attributes = RequestContextHolder.currentRequestAttributes();
attributes.registerDestructionCallback(name, callback, 3);
}
};
cbf.registerScope("inheritableThreadScope", simpleThreadScope);
/*why the following? Because "Spring Social" gets the HTTP request's username from
*SecurityContextHolder.getContext().getAuthentication() ... and this
*by default only has a ThreadLocal strategy...
*also see https://stackoverflow.com/a/3468965/923560
*/
SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
}
else {
//            logger.info("MainConfig is not backed by a ConfigurableBeanFactory");
}
}
}

即使这样,它有时也会出现错误:

{
"timestamp": 1486686875535,
"status": 500,
"error": "Internal Server Error",
"exception": "java.util.concurrent.ExecutionException",
"message": "org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.facebook' defined in class path resource [com/roomsync/config/SocialConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.social.facebook.api.Facebook]: Factory method 'facebook' threw exception; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.connectionRepository': Scope 'inheritableThreadScope' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.",
"path": "/facebook/complex"
}

因此,我似乎仍然缺少激活作用域并将线程本地上下文复制到它的部分

发生了两件事:

1) Java流使用一个通用的Fork/Join池来并行执行事务。这些线程不是由Spring框架(或您)创建的。

2) 通过使用ThreadLocal来支持请求范围的bean。

这意味着,如果一个不是由Spring创建的线程试图访问一个请求范围的bean,它将不会被找到,因为该线程不知道它(它不在ThreadLocal中)。

为了解决这个问题,您需要控制哪些线程用于流。一旦实现了这一点,就可以制作一个请求范围的bean的副本,用于子线程。您还需要在线程完成任务后再次清理它们,否则您可能会留下bean,这可能会被该线程上执行的下一个任务看到。

要更改并行流使用的线程,请参阅:Java 8并行流中的自定义线程池

我认为如何正确配置Spring,将请求范围的bean传播到您已经找到的子线程。

这就是我在fork-joined线程中传输请求bean的方法。该示例仅用于说明。

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
// org.slf4j:slf4j-api:1.7.30
import org.slf4j.MDC;
// org.springframework:spring-web:5.2.12.RELEASE
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
class Scratch {
public static void main(String[] args) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
Map<String, String> contextMap = MDC.getCopyOfContextMap();
List<String> list = new ArrayList<>();
list.parallelStream().map(id -> {
try {
// copy all required for spring beans
RequestContextHolder.setRequestAttributes(context);
MDC.setContextMap(contextMap);
// ************************************
// Spring request beans usage goes here
// ************************************
return 1;
} finally {
// clean all from thread local
MDC.clear();
RequestContextHolder.resetRequestAttributes();
}
})
.collect(Collectors.toList());
}
} 

是否需要并行处理流?这导致lambda可能在另一个线程中执行。

Stream Stream=StreamSupport.Stream(userRepository.findAll().spliterator(),false);

我遇到了同样的问题,我试图使用并行流从Kubernetes REST API获取作业信息,因为并行流使用了新的Threads,正如John16384所解释的那样,我的代码无法获得'scopedTarget.oauth2ClientContext',因为它的范围是Spring中的请求,而并行流创建的线程无法访问它。所以我不得不像下面这样更改它;

old version: items.parallelStream().map(jobItem -> createJobObject(jobItem, createJobTrigger(jobItem))).collect(Collectors.toList());
fixed version: items.stream().map(jobItem -> createJobObject(jobItem, createJobTrigger(jobItem))).collect(Collectors.toList());

在createJobObject方法中,我调用了一个REST服务

restTemplate.getForEntity(url, KubernetesJob.class).getBody().getItems();

最新更新