RMQ 异步使用者:pika.adapters.base_connection、_handle_error、致命套接字错误:错误(9,"错误的文件描述符")



我有一个RabbitMQ(版本3.2.4(异步消费者(如此处所述(并侦听队列/路由键,并且在运行没有任何问题之前,直到我最近进行了一些更改。

某些任务很耗时,因此我决定使用多处理库来剥离子进程,这些子进程使用多处理队列/池设计执行这些密集型任务,以便无需等待即可执行我的主要任务。

my_queue = multiprocessing.Queue()
my_pool = multiprocessing.Pool(2, my_method, (my_queue,))

一旦队列和池初始化,我在初始化消费者时将队列作为参数传递(ExampleConsumer__init__方法,如上面的示例链接所示(。然后,在on_message方法中,我将消息推送到my_queue以执行耗时的任务。

编辑:

一些代码示例:

def main():
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
my_queue = multiprocessing.Queue()
my_pool = multiprocessing.Pool(2, my_class().my_method, (my_queue,))
example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F', my_queue)
try:
example.run()
my_pool.close()
my_pool.join()
except KeyboardInterrupt:
my_pool.terminate()
example.stop()

消费者的初始化方法和on_message方法:

def __init__(self, amqp_url, queue):
"""Create a new instance of the consumer class, passing in the AMQP
URL used to connect to RabbitMQ.
:param str amqp_url: The AMQP url to connect with
"""
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
self.queue = queue
def on_message(self, unused_channel, basic_deliver, properties, body):
"""Invoked by pika when a message is delivered from RabbitMQ. The
channel is passed for your convenience. The basic_deliver object that
is passed in carries the exchange, routing key, delivery tag and
a redelivered flag for the message. The properties passed in is an
instance of BasicProperties with the message properties and the body
is the message that was sent.
:param pika.channel.Channel unused_channel: The channel object
:param pika.Spec.Basic.Deliver: basic_deliver method
:param pika.Spec.BasicProperties: properties
:param str|unicode body: The message body
"""
LOGGER.info('Received message # %s from %s: %s',
basic_deliver.delivery_tag, properties.app_id, body)
self.acknowledge_message(basic_deliver.delivery_tag)
self.queue.put(str(body))

进行这些更改后,我开始看到以下类型的异常:

File "consumer_new.py", line 500, in run
self._connection.ioloop.start()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 355, in start
self.process_timeouts()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 283, in process_timeouts
timer['callback']()
File "consumer_new.py", line 290, in reconnect
self._connection.ioloop.start()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 354, in start
self.poll()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 602, in poll
self._process_fd_events(fd_event_map, write_only)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 443, in _process_fd_events
handler(fileno, events, write_only=write_only)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 364, in _handle_events
self._handle_read()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 415, in _handle_read
self._on_data_available(data)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1347, in _on_data_available
self._process_frame(frame_value)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1427, in _process_frame
self._deliver_frame_to_channel(frame_value)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1028, in _deliver_frame_to_channel
return self._channels[value.channel_number]._handle_content_frame(value)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 896, in _handle_content_frame
self._on_deliver(*response)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 983, in _on_deliver
header_frame.properties, body)
File "consumer_new.py", line 452, in on_message
self.acknowledge_message(basic_deliver.delivery_tag)
File "consumer_new.py", line 463, in acknowledge_message
self._channel.basic_ack(delivery_tag)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 159, in basic_ack
return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 1150, in _send_method
self.connection._send_method(self.channel_number, method_frame, content)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1569, in _send_method
self._send_frame(frame.Method(channel_number, method_frame))
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1554, in _send_frame
self._flush_outbound()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 282, in _flush_outbound
self._handle_write()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 452, in _handle_write
return self._handle_error(error)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 338, in _handle_error
self._handle_disconnect()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 288, in _handle_disconnect
self._adapter_disconnect()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 94, in _adapter_disconnect
self.ioloop.remove_handler(self.socket.fileno())
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 579, in remove_handler
super(PollPoller, self).remove_handler(fileno)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 328, in remove_handler
self.update_handler(fileno, 0)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 571, in update_handler
self._poll.modify(fileno, events)
IOError: [Errno 9] Bad file descriptor

run()方法在主进程中继续运行,无需任何干预。如果是这种情况,我不明白为什么会出现错误文件描述符错误,因为没有其他人可以关闭 rmq 连接。此外,由于上述原因,消费者似乎在失败之前运行了 3-4 小时而没有任何问题。

我检查了 Rabbitmq UI 是否没有足够的文件描述符。但这似乎不是问题所在。我无法了解可能出现的问题。

任何帮助不胜感激!谢谢。

Pika 不是线程安全的。它在文档中说得很清楚。各种各样的事情最终都会出错,如果你对线程或子进程中的连接或通道做任何事情,你的程序就会崩溃到奇怪和无信息的错误。 它似乎可以工作一段时间,但最终 Pika 结构会损坏。

如果您需要多处理和 rabbitmq,您有几个选择。

  1. 用兔子代替鼠兔。我没有使用过它,所以我不能评论它对你的适用性,但它是线程安全的。
  2. 如果可以,请分开任务,以便子进程可以打开自己的 Pika 连接。如果您的主程序收到请求,有一个子进程来处理它,然后发送结果,则这不起作用。例如,如果您需要发送确认,则无法在主进程中接收子进程确认消息。
  3. 从子流程中删除 Pika。 如果你的子进程的想法是将计算或耗时的任务分派给它们,你可以尝试创建两个队列:一个用于子进程输入,一个用于输出,并让你的子进程将结果返回到队列中的主程序。然后主程序可以基于此处理 rabbitmq 流量。
  4. 如果您的程序是某种处理请求的服务器,请将所有内容拆分为子进程("工作队列"-模型(https://www.rabbitmq.com/tutorials/tutorial-two-python.html,并让每个子进程作为队列的使用者独立订阅。Rabbitmq 负责轮询调度,通过限制预取,您可以使子进程只选择一个任务,并且在该任务的处理完成之前,它不会拾取任何其他任务,从而确保在第一个任务之后立即发送的任务将被空闲线程或子进程拾取。 在这个模型中,你的主进程根本不需要Pika连接,每个子进程都有一个独立的连接,如2(。

希望这有帮助。

汉努

最新更新