Spring重试模板阻止了我的响应队列



我正在尝试实现什么我有一个来自UI的REST调用,它调用以添加用户。因此,用户将不得不执行异步队列(这是一个约束(,但随后等待响应队列一段配置的时间,并在结果发送回UI之前对其进行处理。如果队列返回的引用号为空,那么我必须删除用户记录并抛出异常,说用户无效。如果响应返回了有效的引用(或者超时发生(,那么我认为它是有效的,并返回成功。

我有一个应用程序,我在其中发送一条队列消息来获取我的用户对象的referenceNumber。然后等待队列响应,然后再响应REST调用。但是,我必须等待配置的时间,等待队列响应返回。

UserManagerImpl

// REST CALL to persist
public User Persist(User user) {
...
...
// Building the message for sending to QUEUE
UserEnvelopeV1_0 userEnvelope =buildUserEnvelope(user);
// This is the place i send the queue message
userQueueClient.send(userEnvelope);
// Update Request time
updateRequestDetails(user.getUserId);
// This is the call i am going retry
boolean userValid = userRetryTemplate.doUserReferenceRetry(userId);
if (!userValid ) {
//remove User Object
throw Exception
}
...
}
// update the request time for reference Number
private void updateRequestDetails(String userId) {
User user = userRepository.findById(userId);
if (user != null) {
user.setRefRequestDateItem(DateHelper.createXMLGregorianCalendar());
userRepository.saveAndFlush(user);
}
public void updateReference(String userId, String referenceNumber) {
User user = userRepository.findById(userId);
if (user != null) {
user.setReference(referenceNumber);
user.setResponseDate(DateHelper.createXMLGregorianCalendar());
userRepository.saveAndFlush(user);
}
}

UserQueueClient:

@Component
public class UserQueueClient {

@JmsListener(id = "#{T(java.util.UUID).nameUUIDFromBytes('${in.res}",
destination = "${in.res}", containerFactory = "containerFactory")
public void receive(Message message, UserEnvelopeV1_0 envelope) throws{

try {
String userId = envelope.getHeader().getMessageIdentification().getUserId();
ApplicationInformationStructure applicationInformation = envelope.getBody().getApplicationInformation();
if(CollectionUtils.isNotEmpty(applicationInformation.getApplicationInformationResult())) {
String referenceNumber = applicationInformation.getApplicationInformationResult().getRefNumber();      
userManager.updateReference(userId, referenceNumber);
}
} catch (Exception e) {
//
}
}
@Transactional(propagation = Propagation.MANDATORY)
public void send(UserEnvelopeV1_0 sarsSoapEnvelope) throws JMSException {

envelope.setHeader();
Message message = sendToQueue(envelope, requestQueue, responseQueue,
userId);
applicationEventPublisher.publishEvent(new MessageLogEvent("USER_GET_REF_NUMBER", message, MessageType.XML,
requestQueue, MessageDirection.SEND, true, false, new Date(), userId));
}
}

UserRetryTemplate


@Component
public class UserRetryTemplate {

@Value("${retry.max.attempts:5}")
private int maxAttempts;
@Value("${response.waiting.time.in.seconds:60}")
private long maxDelay;
@Autowired
private UserRepository userRepository;
private static final long INITIAL_INTERVAL = 2000L;
public RetryTemplate retryTemplate() {
// Max timeout in milliseconds
long maxTimeout = maxDelay*1000;
//double multiplier = (maxTimeout - INITIAL_INTERVAL)/((maxAttempts-2)*6000);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(maxAttempts);

FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(maxTimeout/(maxAttempts-1));
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy);
template.setBackOffPolicy(backOffPolicy);
return template;
}
public boolean doUserReferenceRetry(String userId) {
boolean isUserReferenceValid = true;
try {
boolean isValidUser = retryTemplate().execute(context -> {
logger.info("Attempted {} times", context.getRetryCount());
User user = userRepository.findById(userId);
logger.info("User Retry :" + user);
if (user.getResponseDateItem() == null || user.getReferenceNumber == null) {
logger.info("response not yet received");
throw new IllegalStateException("User Response not yet received");
}
if (user.getReferenceNumber != null)) {
return true;
}
throw new IllegalStateException("Response not yet received");
});
return isUserReferenceValid ;
} catch (IllegalArgumentException e) {
}
return true;
}
}

因此,我实现了一个逻辑,在该逻辑中,我将发送队列消息并执行Spring重试(针对配置的时间(,以检查数据库中是否更新了referenceNumber。此外,当队列响应返回时,我将使用referenceNumber更新DB。

但是,当我实现上述逻辑时,spring重试会一直重试到配置的时间,但我的spring应用程序不会处理任何响应队列。有没有一种方法可以让Spring应用程序并行运行这两个进程。

问题是,如果我删除spring重试机制,响应队列将处理我的响应,并使用参考号更新User记录。

但是,当我添加重试逻辑时,响应队列不再处理我的队列。

我发现下面的内容令人困惑。

"在那里,我将发送队列消息并进行Spring重试(针对配置的时间(,以检查数据库中是否更新了referenceNumber。此外,当队列响应返回时,我将使用referenceNumber更新数据库。">

在一行中,您说您正在等待更新参考号,在另一行中您说您在更新数据库。谁是这里的制作人?有两条不同的线吗?生产者和消费者在这种情况下你。

如果要在配置的时间内阻止当前线程您可以考虑使用轮询(长超时,TimeUnit单位(方法阻塞队列吗

poll(long timeout, TimeUnit unit) – retrieves and removes the head of the queue, waiting up to the specified wait time if necessary for an element to become available. Returns null after a timeout

请编辑有足够详细信息的问题。

最新更新