Webflux:从流内对象添加订阅者上下文值



我有一个Webflux应用程序,我正在使用订阅者上下文来填充MDC值,以便它们向下游传播。我已经实现了这个项目中的类来处理订阅者之间的MDC传输,设置了一个webfilter来向传入的请求添加请求ID,并且可以在日志中看到请求ID作为MDC的一部分。

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange)
.contextWrite(Context.of("requestId", UUID.randomUUID().toString()));
}

日志输出:

2021-09-30 17:15:29,963 [reactor-tcp-nio-2] INFO  c.p.l.s.RepoService:33 - MDC[requestId=ec0b68cf-ba4d-4c7f-afa4-f67fc97ebcbf] - Found user

请求需要从数据库中拉出一个用户记录,我需要添加用户的ID和电子邮件到上下文,一旦数据被获取,但我不知道如何从流中的对象添加值到上下文。

这就是我要做的:

public Mono<Response> doProcessing(String userId, Object object) {
return userRepo.findUserById(userId) //Returns UserEntity object
.flatMap(userEntity -> service.doMoreProcessing(userEntity, object))
.contextWrite(Context.of("userId", userId, "email", userEntity.getEmail()));
}

问题是,当我调用contextWrite()时,我不能访问userEntity对象,所以我不能使用该调用将用户的电子邮件添加到上下文中。

我试过使用Mono.deferredContextual()

public Mono<Response> doProcessing(String userId, Object object) {
return userRepo.findUserById(userId) //Returns UserEntity object
.flatMap(userEntity -> Mono.deferContextual(ctx -> Mono.just(userEntity).contextWrite(Context.of(ctx).putAll(Context.of("userId", userId, "email", userEntity.getEmail()).readOnly()))))
.flatMap(userEntity -> service.doMoreProcessing(userEntity, object));
}

…和.transformDeferredContextual()

public Mono<Response> doProcessing(String userId, Object object) {
return userRepo.findUserById(userId) //Returns UserEntity object
.transformDeferredContextual((userEntityMono, contextView) -> userEntityMono.flatMap(userEntity -> Mono.just(userEntity).contextWrite(Context.of(contextView).putAll(Context.of("userId", userId, "email", userEntity.getEmail()).readOnly()))))
.flatMap(userEntity -> service.doMoreProcessing(userEntity, object));
}

…但是都没用。如何将属于流程流一部分的对象中的数据添加到订阅者上下文?

上下文可以存储可变状态对象,如ConcurrentMap或更好的@Data注释类,可以在链上访问和修改(如果,您可以保证您不会修改链中的相同字段)。

要获得数据,您需要使用doOnEach从信号中获取数据,因为doOnNext不提供getContextView()

可以提供的信号值。你可以试试这样

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange)
.contextWrite(Context.of("your-key", new ConcurrentHashMap<String, Object>()));
}
public Mono<Response> doProcessing(String userId, Object object) {
return userRepo.findUserById(userId)
.doOnEach(signal -> {
if (!signal.isOnNext()) return;
Optional<Map<String, Object>> optional = signal.getContextView().getOrEmpty("your-key");
optional.ifPresent(map -> {
map.put("userId", signal.get().getId());
map.put("email", signal.get().getEmail());
});
});
}

相关内容

  • 没有找到相关文章

最新更新