无效地址JMSException与SQSSession使用临时凭据时



尝试使用JMS连接到另一个AWS帐户中的SQS队列时出现错误。我试图遵循这个答案中采取的方法,但我收到以下错误:

com.amazonaws.services.sqs.model.AmazonSQSException: The address https://sqs.us-east-1.amazonaws.com/ is not valid for this endpoint. (Service: AmazonSQS; Status Code: 404; Error Code: InvalidAddress; Request ID: d7f72bd3-6240-5f63-b313-70c2d8978c14; Proxy: null)

不像上面提到的帖子(我相信在默认提供者链中有帐户凭据?)我正在尝试假设一个可以访问此SQS队列的角色。这是不可能通过JMS还是我做错了什么?

import com.amazon.sqs.javamessaging.ProviderConfiguration;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazon.sqs.javamessaging.SQSSession;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
/**
* A configuration class for JMS to poll an SQS queue
* in another AWS account
*/
@Configuration
public class TranslationJmsConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(TranslationJmsConfig.class);
@Value("${iam.connection.arn}")
private String connectionRoleArn;

@Value("${account.id}")
private String brokerAccountId;
/**
* JmsListenerContainerFactory bean for translation processing response queue
*
* @param concurrentConsumers number of concurrent consumers
* @param maxConcurrentConsumers max number of concurrent consumers
* @return An instance of JmsListenerContainerFactory
*/
@Bean("translationJmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory translationJmsListenerContainerFactory(
@Value("#{new Integer('${listener.concurrency}')}") int concurrentConsumers,
@Value("#{new Integer('${listener.max.concurrency}')}") int maxConcurrentConsumers) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getConnectionFactory(connectionRoleArn));
factory.setDestinationResolver(new SqsDynamicDestinationResolver(brokerAccountId));
factory.setSessionTransacted(false);    //SQS does not support transaction.
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); // Automatic message acknowledgment after successful listener execution; best-effort redelivery in case of a user exception thrown as well as in case of other listener execution interruptions (such as the JVM dying).
factory.setConcurrency(String.format("%d-%d", concurrentConsumers, maxConcurrentConsumers));
return factory;
}

/**
* create custom JMS Template
* @return JmsTemplate
*/
@Bean
public JmsTemplate customJmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(getConnectionFactory(connectionRoleArn));
jmsTemplate.setDestinationResolver(new SqsDynamicDestinationResolver(brokerAccountId));
return jmsTemplate;
}
/**
* A dynamic destination resolver for sqs queue
*/
public class SqsDynamicDestinationResolver extends DynamicDestinationResolver {
private final String brokerAccountId;
/**
* Constructor
* @param brokerAccountId broker Account Id
*/
public SqsDynamicDestinationResolver(String brokerAccountId) {
this.brokerAccountId = brokerAccountId;
}
@Override
protected Queue resolveQueue(Session session, String queueName) throws JMSException {
if (session instanceof SQSSession) {
SQSSession sqsSession = (SQSSession) session;
return sqsSession.createQueue(queueName, brokerAccountId); // 404 invalid address -- Something wrong with creds?
}
return super.resolveQueue(session, queueName);
}
}
private ConnectionFactory getConnectionFactory(String connectionRoleArn){
AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder()
.build();
// assume the connector account credentials -> so we can assume customer account using chaining
AWSCredentialsProvider dummyCredentialProviders = IdentityHelpers.assumeInternalRole(stsClient, connectionRoleArn); // A helper that assumes temporary creds
return new SQSConnectionFactory(
new ProviderConfiguration(),
AmazonSQSClientBuilder.standard()
.withRegion(Regions.US_EAST_1)
.withCredentials(dummyCredentialProviders)
);
}
}

我意识到,在使用临时凭据时,我不需要sqsSession.createQueue调用的第二个参数(帐户id)。一旦我改变了

sqsSession.createQueue(queueName, brokerAccountId);

:

return sqsSession.createQueue(queueName);

它工作正常。我想我误解了对账户id的需求。我假设在您的providerChain中有多个帐户并且希望它搜索特定帐户时使用该参数。任何关于这一点的光仍然是值得赞赏的!

最新更新