使用vertx EventBus时传播线程上下文



我正在尝试在使用事件总线时使上下文传播正常工作。我按照本指南创建了一个简单的流程https://quarkus.io/guides/reactive-event-bus.

代码看起来像这样:

@Inject
EventBus bus;
@Inject
@ManagedExecutorConfig(propagated = "SLF4J_MDC")
ManagedExecutor managedExecutor;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> hello(String name) {
    MDC.put("key", "someValue");
    Context context = Vertx.currentContext();
    ContextLocals.put("key", "Local Context");
    return bus.<String>request("greeting", name)
            .emitOn(managedExecutor)      // I also tried with - context::runOnContext
            .runSubscriptionOn(managedExecutor)
            .onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
@CurrentThreadContext(propagated = {"SLF4J_MDC"})
public String greeting(String name) {
    log.info("MDC2 ->>> {}", MDC.get("key"));                            // null
    log.info("from Context Local {}", ContextLocals.<String>get("key")); // null
    return "Hello " + name.toUpperCase();
}

SLF4J_MDC是一个简单的ThreadContentProvider实现,我已经注册它来传播MDC值。我已经用普通的异步代码测试了这个机制,它有效,但没有用eventBus。

Endpoint按预期工作,但我试图记录的值为null。

在这种情况下,使用上下文传播的正确方法是什么?它是否可能或不打算与事件总线请求一起使用?

您可以参考-https://reactiverse.io/reactiverse-contextual-logging/

上下文数据不会通过EventBus传播。在这种情况下,它必须是:

  1. 添加到发送方的邮件头中
  2. 从标头中检索并再次保存在接收方

此过程可以通过EventBus拦截器实现自动化:

vertx.eventBus().addOutboundInterceptor(event -> {
  String requestId = ContextualData.get("requestId");
  if (requestId != null) {
    event.message().headers().add("requestId", requestId);
  }
  event.next();
});
vertx.eventBus().addInboundInterceptor(event -> {
  String requestId = event.message().headers().get("requestId");
  if (requestId != null) {
    ContextualData.put("requestId", requestId);
  }
  event.next();
});

最新更新