RabbitMQ pika:无法停止消耗流队列,因为它不允许 nack 或拒绝



我的应用程序有一个Stream队列,我希望它在满足特定条件时停止使用。由于start_consuming使用BlockingConnection永远阻塞,所以似乎唯一的退出方法是在回调中的某个位置调用stop_suming。

不幸的是,这不起作用,因为stop_consuming将拒绝所有挂起的消息(来自pika文档(

注意:挂起的不可确认消息将丢失;挂起的可确认消息将被拒绝。

。。。并且您不能对流队列进行拒绝,因为此错误消息确认:

operation basic.reject caused a connection exception not_implemented: "basic.nack and basic.reject not supported by stream queues queue 'stream-queue' in vhost '/'"

这里有一个最小的例子。服务器代码:

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel_stream = connection.channel()
channel_stream.queue_declare(
"stream-queue",
auto_delete=False, exclusive=False, durable=True,
arguments={
'x-queue-type': 'stream',
}
)
channel_stream.basic_qos(
prefetch_count=1,
)

class Server(object):
def __init__(self):
channel_stream.basic_consume(
queue="stream-queue",
on_message_callback=self.stream_callback,
)
def stream_callback(self, channel, method, props, body):
print(f"received '{body.decode()}' via {method.routing_key}")
channel_stream.stop_consuming()

server = Server()
try:
channel_stream.start_consuming()
except KeyboardInterrupt:
connection.close()

而客户端代码:

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel_stream = connection.channel()
channel_stream.queue_declare(
"stream-queue",
durable=True,
arguments={
'x-queue-type': 'stream',
}
)
# This publishes twice: once to trigger the server's callback
# and again to get a message in the queue that the server is forced
# to nack, causing a crash. 
for i in range(2):
channel_stream.basic_publish(
exchange='',
routing_key='stream-queue',
body=f"stream data".encode()
)
connection.close()

以及完整输出:

~/anaconda3/envs/py310/bin/python ~/workspace/rabbitmq_train/stream_bug/server_stream_only.py 
received 'stream data' via stream-queue
Traceback (most recent call last):
File "~/workspace/rabbitmq_train/stream_bug/server_stream_only.py", line 36, in <module>
channel_stream.start_consuming()
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1880, in start_consuming
self._process_data_events(time_limit=None)
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 2041, in _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 848, in process_data_events
self._dispatch_channel_events()
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events
impl_channel._get_cookie()._dispatch_events()
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1507, in _dispatch_events
consumer_info.on_message_callback(self, evt.method,
File "~/workspace/rabbitmq_train/stream_bug/server_stream_only.py", line 30, in stream_callback
channel_stream.stop_consuming()
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1893, in stop_consuming
self._cancel_all_consumers()
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1494, in _cancel_all_consumers
self.basic_cancel(consumer_tag)
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1802, in basic_cancel
self._flush_output(
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1350, in _flush_output
self._connection._flush_output(lambda: self.is_closed, *waiters)
File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
raise self._closed_result.value.error
pika.exceptions.ConnectionClosedByBroker: (540, "NOT_IMPLEMENTED - basic.nack and basic.reject not supported by stream queues queue 'stream-queue' in vhost '/'")
Process finished with exit code 1

这就是我启动服务器的方式:

docker run -it --rm -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

对于不同类型的队列,不能使用相同的channel。总的来说,这不是最佳做法。每个消费者都应该有一个频道。

解决您的问题:

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channelStream = connection.channel()
channelStream.queue_declare(
"stream-queue",
auto_delete=False, exclusive=False, durable=True,
arguments={
'x-queue-type': 'stream',
}
)
channel.queue_declare(
"normal-queue"
)
channelStream.basic_qos(
prefetch_count=1,
)

class Server(object):
def __init__(self):
channelStream.basic_consume(
queue="stream-queue",
on_message_callback=self.stream_callback,
)
channel.basic_consume(
queue="normal-queue",
on_message_callback=self.normal_callback,
)
def stream_callback(self, channel, method, props, body):
print(f"received '{body}' via {method.routing_key}")
channelStream.basic_ack(delivery_tag=method.delivery_tag)
channelStream.stop_consuming()
def normal_callback(self, channel, method, props, body):
print(f"received '{body}' via {method.routing_key}")
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.stop_consuming()

server = Server()
try:
channel.start_consuming()
channelStream.start_consuming()
except KeyboardInterrupt:
connection.close()

多线程将是:

#!/usr/bin/env python
import threading
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
connectionStream = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channelStream = connectionStream.channel()
channelStream.queue_declare(
"stream-queue",
auto_delete=False, exclusive=False, durable=True,
arguments={
'x-queue-type': 'stream',
}
)
channel.queue_declare(
"normal-queue"
)
channelStream.basic_qos(
prefetch_count=1,
)

class Server(object):
def __init__(self):
pass
def stream_callback(self, channel, method, props, body):
print(f"stream_callback '{body}' via {method.routing_key}")
channelStream.basic_ack(delivery_tag=method.delivery_tag)
channelStream.stop_consuming()
def normal_callback(self, channel, method, props, body):
print(f"normal_callback '{body}' via {method.routing_key}")
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.stop_consuming()
def start_consume_normal(self):
print(f"start_consume_normal ")
channel.basic_consume(
queue="normal-queue",
on_message_callback=self.normal_callback,
)
channel.start_consuming()
def start_consume_stream(self):
channelStream.basic_consume(
queue="stream-queue",
on_message_callback=self.stream_callback,
)
print(f"start_consume_stream ")
channelStream.start_consuming()

server = Server()
try:
x = threading.Thread(target=server.start_consume_normal, args=())
x.start()
print(f"start_consume_normal")
x2 = threading.Thread(target=server.start_consume_stream, args=())
x2.start()
print(f"start_consume_stream")
input()
except KeyboardInterrupt:
connection.close()

关于多线程的注意事项

相关内容

最新更新