我正在收集队列中的数据以进行处理。我的目标是不断处理数据,不让错误导致应用程序崩溃,所以我会记录异常,并尝试让程序继续运行。为此,我将cosume语句嵌套在一个无限循环中,但它似乎不起作用。我经常会来到这个程序,看到它说"[x]完成",然后等待,我可以看到队列中有大量数据。
以下是我的代码片段:
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
doWork(body)
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='dataProcessingQueue')
while True:
try:
channel.start_consuming()
except:
time.sleep(10)
我做错了什么?如果我的队列有3000个条目,这将占10-15%,那么出于某种原因,它就会挂起。我的while循环有问题吗?
您应该在回调中进行错误处理。我不确定在发生错误后再次调用start_consuming()是否合法(它的内部状态可能处于某种错误状态)。你应该记录你得到的错误,这样你就知道发生了什么,并且可以完善异常处理程序,只捕捉可恢复的错误。我无法对此进行测试,所以请原谅任何小错误。
import logging
import traceback
# NOTE: Just a simple logging config here, you can get fancier
logging.basicConfig(level=logging.DEBUG)
def callback(ch, method, properties, body):
logger = logging.getLogger('callback')
try:
logger.info(" [x] Received %r" % (body,))
doWork(body)
logger.info(" [x] Done")
except Exception, e:
# get granular over time as you learn what
# errors you get because some things like
# SyntaxError should not be dropped
logger.error("Exception %s: %s" %(type(e),e))
logger.debug(traceback.format_exc())
finally:
# set to always ack... even on failure
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='dataProcessingQueue')
channel.start_consuming()
我看到您在这里使用RabbitMQ。如果是这样,这就是你必须做的:
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
doWork(body)
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='dataProcessingQueue')
channel.start_consuming()
Yes,no while True循环以包装start_consuming函数。'
参考RabbitMQ教程