无限循环以保持消耗队列



我正在收集队列中的数据以进行处理。我的目标是不断处理数据,不让错误导致应用程序崩溃,所以我会记录异常,并尝试让程序继续运行。为此,我将cosume语句嵌套在一个无限循环中,但它似乎不起作用。我经常会来到这个程序,看到它说"[x]完成",然后等待,我可以看到队列中有大量数据。

以下是我的代码片段:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    doWork(body)
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='dataProcessingQueue')
while True:
    try:
        channel.start_consuming()
    except:
        time.sleep(10)

我做错了什么?如果我的队列有3000个条目,这将占10-15%,那么出于某种原因,它就会挂起。我的while循环有问题吗?

您应该在回调中进行错误处理。我不确定在发生错误后再次调用start_consuming()是否合法(它的内部状态可能处于某种错误状态)。你应该记录你得到的错误,这样你就知道发生了什么,并且可以完善异常处理程序,只捕捉可恢复的错误。我无法对此进行测试,所以请原谅任何小错误。

import logging
import traceback
# NOTE: Just a simple logging config here, you can get fancier
logging.basicConfig(level=logging.DEBUG)
def callback(ch, method, properties, body):
    logger = logging.getLogger('callback')
    try:
        logger.info(" [x] Received %r" % (body,))
        doWork(body)
        logger.info(" [x] Done")
    except Exception, e:
        # get granular over time as you learn what
        # errors you get because some things like
        # SyntaxError should not be dropped
        logger.error("Exception %s: %s" %(type(e),e))
        logger.debug(traceback.format_exc())
    finally:
        # set to always ack... even on failure
        ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='dataProcessingQueue')
channel.start_consuming()

我看到您在这里使用RabbitMQ。如果是这样,这就是你必须做的:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    doWork(body)
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='dataProcessingQueue')
channel.start_consuming()

Yes,no while True循环以包装start_consuming函数。'

参考RabbitMQ教程

最新更新