我想在pika basic_sume中运行一个消息回调异步。这可能吗?怎么可能?我们已经在为其他任务运行异步循环,这个使用者使用httpx和异步连接来调用内部服务。
这是我们目前的消费者类别:
class Consumer:
"""
https://www.devmashup.com/creating-a-rabbitmq-consumer-in-python/
"""
connection: AsyncioConnection
channel: Any
routing_key: str
def __init__(self, routing_key) -> None:
self.connection = self.__create_connection()
self.channel = self.connection.channel()
self.__create_exchange()
self.routing_key = routing_key
@staticmethod
def __create_connection():
credentials = PlainCredentials(
settings.mqtt_vhost_user, settings.mqtt_vhost_password
)
parameters = ConnectionParameters(
settings.mqtt_host, settings.mqtt_port, settings.mqtt_vhost, credentials
)
return AsyncioConnection(parameters)
def close_connection(self):
self.connection.close()
def __create_exchange(self):
self.channel.exchange_declare(
exchange=settings.mqtt_exchange,
exchange_type=settings.mqtt_exchange_type,
passive=False,
durable=True,
auto_delete=False,
internal=False,
)
def consume(self, message_received_callback):
logger.info(f"Started consumer for {self.routing_key}")
self.channel.queue_declare(
queue=self.routing_key,
passive=False,
durable=True,
exclusive=False,
auto_delete=False,
)
self.channel.queue_bind(
queue=self.routing_key,
exchange=settings.mqtt_exchange,
routing_key=self.routing_key,
)
async def consume_message(channel, method, properties, body):
await message_received_callback(body)
channel.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(
self.routing_key,
consume_message,
)
self.channel.start_consuming()
对于async+rabbitmq,我建议您使用aiormq。它是开箱即用的async,与asyncio一起工作很顺利,而且当你已经了解pika时,api是直观的/类似的。
以下是如何创建一个简单的消费者