我用python编写了以下兔子mq的工作生产者消费者代码。但我有一个转折点。使用者每 0.5 秒连续将数据放入队列,但现在我希望我的使用者每 3 秒唤醒一次,并获取发布者放入队列的所有 6 个数据并再次休眠 3 秒。我想为此进行无限循环。
但我不确定如何在兔子 mq 中实现这一目标
制作人
import pika
import time
import datetime
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
value=str(int(time.time()))
for i in range (1000):
channel.basic_publish(exchange='',routing_key='hello',body='{"action": "print", "method": "onData", "data": "Madan Mohan"}')
time.sleep(0.5)
connection.close()
消费者
#!/usr/bin/env python
import pika
import time
import json
import datetime
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
#print " current time: %s " % (str(int((time.time())*1000)))
d=json.loads(body)
print d
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
在回调中使用睡眠的第一个解决方案。但这可能不是一个好的解决方案,因为basic_consume旨在尽可能快地(异步)获取消息。
got = 0
def callback(ch, method, properties, body):
#print " current time: %s " % (str(int((time.time())*1000)))
d=json.loads(body)
print d
got = got + 1
if got == 6
got = 0
time.sleep(3)
使用channel.basic_get。同步获取消息是更合适的解决方案。
got = 0
while True
channel.basic_get(callback,
queue='hello',
no_ack=True)
got = got + 1
if got == 6
got = 0
time.sleep(3)