Spring Data未保存实体



我有一个问题,实体不被保存的Spring数据。应用程序的逻辑是这样的:

  1. 另一个应用程序正在监听一个负载非常重的Kafka主题(每秒数十条消息),并以'NEW'状态将消息插入数据库中的表中。
  2. @Scheduled方法加载一个具有'NEW'状态的实体列表,这些实体被一个接一个地转移到FixedThreadPool(20个线程),它们的状态被设置为'PROCESSING',并且在同一个表上调用saveAll方法。
@Scheduled(fixedDelay = 10_000)
public void scheduled() {
Pageable pageable = PageRequest.of(0, pageSize);
List<EntityClass> entities = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
while (!entities.isEmpty()) {
entities.forEach(entity -> {
entity.setStatus(PROCESSING_STATUS);
process(entity);
});
entityService.saveAll(entities);
loggers = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
}
private void process(Entity entity) {
threadPool.execute(
() -> processor.execute(
TaskBuilder.entity(entity).build()
)
);
}
  1. 在FixedThreadPool的线程中,一些业务逻辑通过TaskProcessor的几个类发生,导致每个实体的状态更改为'OK'或'ERROR'。
  2. 业务逻辑的最后一步是将具有结果状态的实体更新到它被池出的同一表,这就是我遇到问题的地方:实体的一部分不会持久并永远保持在"处理"状态。从最后一步的最后一部分开始:
...
log.info("[SaveStep] [execute] status before {}", entity.getStatus());
Entity entity = entityService.save(entity);
log.info("[SaveStep] [execute] status after {}", entity.getStatus());

两个日志方法都显示正确的状态('OK'或'ERROR'),但数据库行一直包含'PROCESSING'状态。我试图重做一个具有多种变化的entityService.save() (entityRepository是一个常规的jparerepository):

  1. 常规的repo.save()不起作用:
public Entity save(Entity entity) {
return repository.save(logger);
}
  1. Get, refresh and save不工作:
@Transactional
public Entity save(Entity entity) {
Entity saved = repository.getFirstById(entity.getId());
saved.setStatus(entity.getStatus());
return repository.save(saved);
}
  1. 我有一个想法,一个Hikari池大小可能是一个问题,但增加到最大池大小=20,甚至30没有做一件事。
  2. 如果我用记录器环绕选项2,我在两个记录器行中都有正确的状态,但不在数据库中:
@Transactional
public Entity save(Entity entity) {
Entity saved = repository.getFirstById(entity.getId());
log.info("[Service] [save] status before {}", saved.getStatus());
saved.setStatus(entity.getStatus());
log.info("[Service] [save] status after {}", saved.getStatus());
return repository.save(saved);
}
  1. 我尝试过本地查询,但没有成功:
@Modifying
@Transactional
@Query(value = "UPDATE entity_table_name set status = :status where id = :id", nativeQuery = true)
void updateStatus(@Param("id") Long id, @Param("status") String status);
  1. 我已经尝试用repo.saveAndFlush()代替repo.save(),但是没有效果。

目前我没有办法去尝试。最令人不快的是,大多数实体最终都被数据库正确地消化了,而只有大约20%的被save()忽略了。没有抛出任何异常。

您正在将消息设置为"正在处理";在提交到池之后,这意味着池可能会先处理它,然后进行"处理"。之后可能会更新。我不确定这是不是你的问题,但就你现在做的事来说,这是有可能的。尝试设置和保存/提交"处理";在提交到池之前更新。

感谢Taylor,他完全正确地指出了我的问题。实体列表已被并行处理并以正确的状态保存,甚至之前也足够快。调度程序碰到saveAll('PROCESSING')行,这会导致状态重写。

最新更新