从响应性AMQP后台线程启动JPA交互



我们有一个常规的,阻塞的Spring Boot(2.5.4)应用程序服务于REST接口。我们使用Hibernate服务器进行审计日志记录。该服务通过Azure SDK的ServiceBusSenderAsyncClient以异步方式在我们的服务总线上发送消息。现在,该服务需要接收来自上述服务总线的消息,并将更新写入数据库——也是异步的。为此,我们使用具有反应流的ServiceBusReceiverAsyncClient。所有这些通常都有效(我将整个数据库交互包装在一个Transactional函数中),但我遇到了通过beforeTransactionCompletion callback作为null的enver的问题(当我停用enver时,它有效)。这个问题是服务总线独立于我的理解,并真正围绕从后台线程访问JPA回购。下面是示意图实现:

ServiceBusReceiveController

@Component
public class SubscriptionReceiveController {
@Autowired
private MyService myService;
@Autowired
private ServiceBusReceiverAsyncClient receiver;
@Autowired
private ObjectMapper mapper;
@PostConstruct
public void run() {
// continuously receive messages in the background
receiver.receiveMessages().flatMap(message -> {
MyObject object;
try {
object = mapper.readValue(message.getBody().toString(), MyClass.class);
} catch (JsonProcessingException e) {
throw new JsonProcessingRuntimeException("Failed to convert message to object", e);
}
// wrap the blocking JPA interaction
Mono<MyObject> blockingWrapper = Mono.fromCallable(() -> {
return myService.updateMyObject(object.getId());
});
// When the service returns, this is where it crashes
return blockingWrapper.subscribeOn(Schedulers.boundedElastic());
}).flatMap(response -> {
// More processing steps
}).subscribe();
}
}

MyService

@Service
public class MyService {
@Autowired
private MyRepository myRepository;
@Transactional
public MyObject updateMyObject(UUID objectId) {
var object = myRepository.getById(objectId);
object.setSomeProperty("myPropertyValue");
return myRepository.save(object);
}
}

导致以下错误消息:

org.springframework.orm.jpa.JpaSystemException: Unable to perform beforeTransactionCompletion callback: null; nested exception is org.hibernate.HibernateException: Unable to perform beforeTransactionCompletion callback: null
Caused by: org.hibernate.HibernateException: Unable to perform beforeTransactionCompletion callback: null
at org.hibernate.engine.spi.ActionQueue$BeforeTransactionCompletionProcessQueue.beforeTransactionCompletion(ActionQueue.java:960) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
at org.hibernate.engine.spi.ActionQueue.beforeTransactionCompletion(ActionQueue.java:525) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
at org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2381) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:448) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:183) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$300(JdbcResourceLocalTransactionCoordinatorImpl.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:281) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:101) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:562) ~[spring-orm-5.3.9.jar:5.3.9]
... 20 common frames omitted
...
Caused by: java.lang.NullPointerException: null
at com.my.package.revision.AuditRevisionListener.newRevision(AuditRevisionListener.java:10) ~[classes/:na]
at org.hibernate.envers.internal.revisioninfo.DefaultRevisionInfoGenerator.generate(DefaultRevisionInfoGenerator.java:88) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
at org.hibernate.envers.internal.synchronization.AuditProcess.getCurrentRevisionData(AuditProcess.java:133) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
at org.hibernate.envers.internal.synchronization.AuditProcess.executeInSession(AuditProcess.java:115) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
at org.hibernate.envers.internal.synchronization.AuditProcess.doBeforeTransactionCompletion(AuditProcess.java:174) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
at org.hibernate.envers.internal.synchronization.AuditProcessManager$1.doBeforeTransactionCompletion(AuditProcessManager.java:47) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
at org.hibernate.engine.spi.ActionQueue$BeforeTransactionCompletionProcessQueue.beforeTransactionCompletion(ActionQueue.java:954) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
... 28 common frames omitted

我在这里遗漏了什么,为什么事务没有在服务实现中完全完成?是否有一个更好的方式/模式为后台线程写通过JPA数据库?

感谢

空指针异常如下:com.my.package.revision.AuditRevisionListener.newRevision(AuditRevisionListener.java:10)与交易无关?

我在使用RevisionListener时遇到了同样的问题。正如本期所描述的,Spring框架和Hibernate enver之间似乎缺乏通信:https://github.com/spring-projects/spring-data-envers/issues/249。

即使在用@Service注释了我的CustomRevisionListener之后,spring也无法找到这个bean并创建它的实例。

我已经尝试了这里列出的所有解决方案:如何将spring bean注入hibernate服务器

但是只有使用SecurityContextHolder(解决方案3)的解决方案对我有效。我只需要检查代表登录用户的实体是否有Id属性,并用从数据库检索到的用户Id填充它。

下面是我的代码:
public class MyRevisionListener implements RevisionListener {
@Override
public void newRevision(Object entity) {
MyRevisionEntity revisionEntity = (MyRevisionEntity) entity;
Long loggedUserId = Optional.ofNullable(SecurityContextHolder.getContext())
.map(SecurityContext::getAuthentication)
.filter(Authentication::isAuthenticated)
.map(Authentication::getPrincipal)
.map(JwtUser.class::cast)
.map(JwtUser::getId)
.orElse(null);
revisionEntity.setUserId(loggedUserId);
revisionEntity.setDate(new Date());    
}
}

最新更新