正在将SQS-Consumer放置在SQS可扩展中检测收款事件



我使用AWS SQS作为消息队列。sqs.sendMessage发送数据后,我想通过无限循环或事件触发以可扩展的方式检测sqs.receiveMessage。然后我来了Sqs-Consumer要处理sqs.receiveMessage事件,它收到消息的那一刻。但是我想知道,这是处理微服务之间传递消息的最合适方法,还是还有其他更好的方法来处理此操作?

我已经在Java中编写了代码,用于用SQSBufferedAsyncclient从SQS队列中获取数据,使用此API的优点以异步模式缓冲了消息。

/**
 * 
 */
package com.sxm.aota.tsc.config;
import java.net.UnknownHostException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.retry.RetryPolicy.BackoffStrategy;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient;
import com.amazonaws.services.sqs.buffered.QueueBufferConfig;
@Configuration
public class SQSConfiguration {
    /** The properties cache config. */
    @Autowired
    private PropertiesCacheConfig propertiesCacheConfig;
    @Bean
    public AmazonSQSAsync amazonSQSClient() {
        // Create Client Configuration
        ClientConfiguration clientConfig = new ClientConfiguration()
            .withMaxErrorRetry(5)
            .withConnectionTTL(10_000L)
            .withTcpKeepAlive(true)
            .withRetryPolicy(new RetryPolicy(
                    null, 
                new BackoffStrategy() {                 
                    @Override
                    public long delayBeforeNextRetry(AmazonWebServiceRequest req, 
                            AmazonClientException exception, int retries) {
                        // Delay between retries is 10s unless it is UnknownHostException 
                        // for which retry is 60s
                        return exception.getCause() instanceof UnknownHostException ? 60_000L : 10_000L;
                    }
                }, 10, true));
        // Create Amazon client
        AmazonSQSAsync asyncSqsClient = null;
        if (propertiesCacheConfig.isIamRole()) {
            asyncSqsClient = new AmazonSQSAsyncClient(new InstanceProfileCredentialsProvider(true), clientConfig);
        } else {
            asyncSqsClient = new AmazonSQSAsyncClient(
                    new BasicAWSCredentials("sceretkey", "accesskey"));
        }
        final Regions regions = Regions.fromName(propertiesCacheConfig.getRegionName());
        asyncSqsClient.setRegion(Region.getRegion(regions));
        asyncSqsClient.setEndpoint(propertiesCacheConfig.getEndPoint());
        
        // Buffer for request batching
        final QueueBufferConfig bufferConfig = new QueueBufferConfig();
        // Ensure visibility timeout is maintained
        bufferConfig.setVisibilityTimeoutSeconds(20);
        // Enable long polling
        bufferConfig.setLongPoll(true);
        // Set batch parameters
//      bufferConfig.setMaxBatchOpenMs(500);
        // Set to receive messages only on demand
//      bufferConfig.setMaxDoneReceiveBatches(0);
//      bufferConfig.setMaxInflightReceiveBatches(0);
        
        return new AmazonSQSBufferedAsyncClient(asyncSqsClient, bufferConfig);
    }
    
}

然后编写调度程序,该调度程序每2秒钟执行并从队列中获取数据,对其进行处理并将其从队列中删除,然后在可见性超时之前将其从队列中删除,否则,当Visiibility Tiiimeout再次到期时,它将准备好再次处理。

package com.sxm.aota.tsc.sqs;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.fasterxml.jackson.databind.ObjectMapper;

    /**
     * The Class TSCDataSenderScheduledTask.
     * 
     * Sends the aggregated Vehicle data to TSC in batches
     */
    @EnableScheduling
    @Component("sqsScheduledTask")
    @DependsOn({ "propertiesCacheConfig", "amazonSQSClient" })
    public class SQSScheduledTask {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SQSScheduledTask.class);
        @Autowired
        private PropertiesCacheConfig propertiesCacheConfig;
        @Autowired
        public AmazonSQSAsync amazonSQSClient;
        
        /**
         * Timer Task that will run after specific interval of time Majorly
         * responsible for sending the data in batches to TSC.
         */
        private String queueUrl;
        private final ObjectMapper mapper = new ObjectMapper();
    
        @PostConstruct
        public void initialize() throws Exception {
            LOGGER.info("SQS-Publisher", "Publisher initializing for queue " + propertiesCacheConfig.getSQSQueueName(),
                    "Publisher initializing for queue " + propertiesCacheConfig.getSQSQueueName());
            // Get queue URL
            final GetQueueUrlRequest request = new GetQueueUrlRequest().withQueueName(propertiesCacheConfig.getSQSQueueName());
            final GetQueueUrlResult response = amazonSQSClient.getQueueUrl(request);
            queueUrl = response.getQueueUrl();
    
            LOGGER.info("SQS-Publisher", "Publisher initialized for queue " + propertiesCacheConfig.getSQSQueueName(),
                    "Publisher initialized for queue " + propertiesCacheConfig.getSQSQueueName() + ", URL = " + queueUrl);
        }
    
        @Scheduled(fixedDelayString = "${sqs.consumer.delay}")
        public void timerTask() {
    
            final ReceiveMessageResult receiveResult = getMessagesFromSQS();
            String messageBody = null;
            if (receiveResult != null && receiveResult.getMessages() != null && !receiveResult.getMessages().isEmpty()) {
                try {
                    messageBody = receiveResult.getMessages().get(0).getBody();
                    String messageReceiptHandle = receiveResult.getMessages().get(0).getReceiptHandle();
                    Vehicles vehicles = mapper.readValue(messageBody, Vehicles.class);
                    processMessage(vehicles.getVehicles(),messageReceiptHandle);
                } catch (Exception e) {
                    LOGGER.error("Exception while processing SQS message : {}", messageBody);
                    // Message is not deleted on SQS and will be processed again after visibility timeout
                }
            }
        }
    
        public void processMessage(List<Vehicle> vehicles,String messageReceiptHandle) throws InterruptedException {
            //processing code
            //delete the sqs message as the processing is completed
            //Need to create atomic counter that will be incremented by all TS.. Once it will be 0 then we will be deleting the messages
                
                    amazonSQSClient.deleteMessage(new DeleteMessageRequest(queueUrl, messageReceiptHandle));
                    
        }
    
        private ReceiveMessageResult getMessagesFromSQS() {
            try {
                // Create new request and fetch data from Amazon SQS queue
                final ReceiveMessageResult receiveResult = amazonSQSClient
                        .receiveMessage(new ReceiveMessageRequest().withMaxNumberOfMessages(1).withQueueUrl(queueUrl));
                return receiveResult;
            } catch (Exception e) {
                LOGGER.error("Error while fetching data from SQS", e);
            }
            return null;
        }
    
    }

最新更新