Python Multiprocessing Reading from SQS



我有以下代码可以启动多个不断从 SQS 队列轮询的 python 进程。

这些进程启动时

num_processes = range(1, 9)
    for p_num in num_processes:
        p = multiprocessing.Process(
            target=sqs_polling, args=(queue_name, p_num,))
        p.start()

实际的轮询函数是

def sqs_polling(queue_name, process_id):
    sqs = boto3.resource('sqs', region_name='us-east-1')
    queue = sqs.get_queue_by_name(QueueName=queue_name)
    no_messages = False
    # poll sqs forever
    while 1:
        # polling delay so aws does not throttle us
        sleep(2.0)
        # sleep longer if there are no messages on the queue the last time it was polled
        if no_messages:
            sleep(900.0)
        message_batch = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20)
        if len(message_batch) == 0:
            no_messages = True
        else:
            no_messages = False
        # process messages
        for message in message_batch:
            do_something(message)
            message.delete()

这似乎工作了几个小时,但最终似乎 SQS 限制了进程,即使队列中存在消息,也无法读取任何消息。 为了帮助减少这种情况,我在队列读取之间有 2 秒的超时。 此外,如果没有读取消息,我还创建了 15 分钟的超时。 尽管如此,我仍然会受到限制。 谁能解释为什么这里仍然发生节流? 另一种可能性可能是与队列的连接过时,但我认为这不太可能。

这个问题

有点过时了,但我刚刚发布了multi_sqs_listener,它提供了一种高级、多线程的方式来监听来自 Python 代码的多个 SQS 队列。

import time
from multi_sqs_listener import QueueConfig, EventBus, MultiSQSListener

class MyListener(MultiSQSListener):
    def low_priority_job(self, message):
        print('Starting low priority, long job: {}'.format(message))
        time.sleep(5)
        print('Ended low priority job: {}'.format(message))
    def high_priority_job(self, message):
        print('Starting high priority, quick job: {}'.format(message))
        time.sleep(.2)
        print('Ended high priority job: {}'.format(message))
    def handle_message(self, queue, bus, priority, message):
        if bus == 'high-priority-bus':
            self.high_priority_job(message.body)
        else:
            self.low_priority_job(message.body)
low_priority_bus = EventBus('low-priority-bus', priority=1)
high_priority_bus = EventBus('high-priority-bus', priority=5)
EventBus.register_buses([low_priority_bus, high_priority_bus])
low_priority_queue = QueueConfig('low-priority-queue', low_priority_bus)
high_priority_queue = QueueConfig('high-priority-queue', high_priority_bus)
my_listener = MyListener([low_priority_queue, high_priority_queue])
my_listener.listen()

相关内容

  • 没有找到相关文章

最新更新