实现每秒发送消息的机制



我需要每秒向用户发送来自rabbitMQ的消息。出于这些目的,我使用了2个线程。第一个从rabbitMQ接收消息并将其排队。第二个线程从队列中获取消息,对其进行处理,并通过web套接字将其发送给用户。我的问题是如何最好地执行这一机制。现在我的代码看起来是这样的:

def __init__(self):
self.data = {}
self.queue = Queue()
def download_data(self):
started_time = time.perf_counter()
while True:
if time.perf_counter() - started_time >= 1:
started_time = time.perf_counter()
for _ in range(self.queue.qsize()):
self.append_data(self.queue.get())
self.sync_send_data_to_user(self.data)
self.data = {}

def message_handle(self, ch, method, properties, body):
message = json.loads(body)
self.queue.put(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consuming(self):
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
channel = connection.channel()
rabbit_queue = channel.queue_declare("to_client")
channel.basic_consume(on_message_callback=self.message_handle, queue="to_client")
threading.Thread(target=channel.start_consuming, args=[]).start()

此代码有效,但有时它每秒发送消息的次数超过1次。第一个线程是rabbitMQ消费者回调函数message_handle,第二个线程是无限循环函数download_datamessage_handle检查是否超过1秒,若发生这种情况,message_handle将阻塞使用队列的线程,直到从队列中检索到所有项目为止。

更新:我想我让自己更难了,我最终改变了一些逻辑。现在我有两个线程,其中一个线程处理来自rabbitmq的消息,并将它们发送到队列,函数message_handle。python中的队列是线程安全的,所以它应该可以工作。第二个线程检查是否已过1秒,如果已过,则从队列中检索所有数据并将其发送给用户,函数download_data

  1. 您将异步IO(协同程序(与线程混合在一起。这两种方法是完全不同的并发方法,会产生不同的行为。一种更合理的方法是只使用这两种方法中的一种来开发解决方案。我会选择一个基于异步的解决方案,并确保尽可能使用await,而不会阻塞事件循环。

  2. Python存在GIL问题。所以,即使你认为你在同时做一些事情,事实上,你在任何给定的时间都绑定到一个线程执行。如果我添加上一节(创建多个线程(,您可能会遇到意外的时间问题。Python确实不是最适合完美计时的(它也深受本地执行环境/负载的影响(

  3. 如果您关心确保真正的并发执行,您可能会了解python多处理。同样,结果可能会有所不同。

最新更新