Spring Sleuth - JMS ErrorHandler 上的中断跟踪



我有一个简单的示例 https://github.com/gtiwari333/sleuth-jms-broken-tracing/tree/master,它将Spring Sleuth与JMS一起使用。

在这里,对/jms端点的调用将消息排队,并且在onMessage方法收到消息时,我们正在对/test进行GET调用并抛出MyException。我们希望跟踪 id 传递到ErrorHandler,以便在日志中看到/jmsonMessage()handleError()/test终结点之间的相同 traceId。

我现在得到的/如何得到错误:

我运行了该应用程序并点击了localhost:8080/jms端点。在下面的日志中,TraceId 不会在类JmsListenerErrorHandler传播,并且为 GET 调用创建了一个新的 TraceId/test

2020-08-04 17:55:24.212  INFO [,225c47fb814f6584,225c47fb814f6584,true] 16956 --- [nio-8080-exec-1] sleuth.SleuthApplication                 : Queuing message ...
2020-08-04 17:55:24.282  INFO [,225c47fb814f6584,eac851f1650ae8a6,true] 16956 --- [enerContainer-1] sleuth.SleuthApplication                 : JMS message received SOME MESSAGE !!!
2020-08-04 17:55:24.321  INFO [,225c47fb814f6584,612a7956f6b29a01,true] 16956 --- [nio-8080-exec-3] sleuth.SleuthApplication                 : test1 called  
<<<<<<<<< FINE UPTO HERE
2020-08-04 17:55:24.332  INFO [,,,] 16956 --- [enerContainer-1] sleuth.SleuthApplication                 : handling error by calling another endpoint ..     
<<<<<<<<< new thread started and lost tracing
2020-08-04 17:55:24.336  INFO [,4c163d0997076729,4c163d0997076729,true] 16956 --- [nio-8080-exec-2] sleuth.SleuthApplication                 : test1 called  
<<<<<<<<< new trace id received

看起来 JMS 处理新线程中新消息的接收/处理。侦探具有必要的"仪器"逻辑来拦截和传播跟踪/跨度 id 以@JmsListener代码,但它不会传播到org.springframework.util.ErrorHandler

  • org.springframework.jms.listener.DefaultMessageListenerContainer.AsyncMessageListenerInvoker
  • org.springframework.jms.listener.AbstractPollingMessageListenerContainer#doReceiveAndExecute

守则:

@RestController和@JmsListener:

@RestController
static class Ctrl {
@Autowired RestTemplate restTemplate;
@Autowired JmsTemplate jmsTemplate;
@GetMapping("/test")
void test() {
log.info("test1 called");
}
@GetMapping("/jms")
void jms() {
log.info("Queuing message ...");
jmsTemplate.convertAndSend("test-queue", "SOME MESSAGE !!!");
}
@JmsListener(destination = "test-queue", concurrency = "5")
void onMessage(TextMessage message) throws JMSException {
log.info("JMS message received {}", message.getText());
restTemplate.getForEntity("http://localhost:8080/test", Void.class); //-->it works
throw new MyException("Some Error");  //-->it doesn't
}
static class MyException extends RuntimeException {
public MyException(String msg) { super(msg); }
}
}

错误处理程序:

@Component
static class JmsListenerErrorHandler implements ErrorHandler {
@Autowired RestTemplate restTemplate;
@Override
public void handleError(Throwable t) {
log.info("handling error by calling another endpoint .."); //1....tracing is lost here
restTemplate.getForEntity("http://localhost:8080/test", Void.class);
}
}

JMS 配置:

@Configuration
@EnableJms
static class ActiveMqConfig implements JmsListenerConfigurer {
@Autowired ErrorHandler jmsListenerErrorHandler;
@Autowired ConnectionFactory connectionFactory;
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setContainerFactory(containerFactory());
}
@Bean
JmsListenerContainerFactory<?> containerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(jmsListenerErrorHandler);
return factory;
}
}

我尝试过:(使其成为一个完整的SO问题)

它在公关中:https://github.com/gtiwari333/sleuth-jms-broken-tracing/pull/1/files 在这里,我尝试使用创建一个由LazyTraceThreadPoolTaskExecutor包装的自定义执行器bean,并尝试将其传递给JmsListenerContainerFactory

它适用于正常的线程执行,但不适用于 JMS 的东西。

executor.execute(() -> log.info("Im inside thread 2")); //it works

有没有人已经想出了如何拦截错误处理程序来传递 TraceId?

有一个关于@JmsListener仪器的未决问题。所以我想目前不支持。

一个可能的解决方案是在异常中传递Span

@RestController
static class Ctrl {
@Autowired
private Tracer tracer;
// ...
@JmsListener(destination = "test-queue", concurrency = "5")
void onMessage(TextMessage message) throws JMSException{
//..
throw new MyException("Some Error",tracer.currentSpan()); // <-- pass current span
}
}

所以你可以在JmsListenerErrorHandler得到它:

@Override
public void handleError(Throwable t) {
if(t.getCause() instanceof MyException){
MyException mEx = (MyException) t.getCause();
log.info("Failing span: {}",mEx.getSpan());
}
//...
}

MyException类:

class MyException extends RuntimeException {
private final Span span;
public MyException(String msg, Span span) {
super(msg);
this.span=span;
}
// Getter for the Span
}

最新更新