PubSub Python客户端-优雅地关闭订阅者



我在python3.6中使用Google Pub/Sub客户端v2.2.0作为订阅者。

我希望我的应用程序在打包它已经收到的所有消息后优雅地关闭。

来自Google指南的订阅者示例代码,稍作修改将显示我的问题:

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from time import sleep
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message}.")
sleep(30)
message.ack()
print("Acked")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
sleep(10)
streaming_pull_future.cancel()
streaming_pull_future.result()

从https://cloud.google.com/pubsub/docs/pull

我希望这段代码停止提取消息并完成正在运行的消息,然后退出。

实际上,这段代码停止提取消息并完成执行正在运行的消息,但它不返回消息。.ack()发生了,但是服务器没有接收到ack,因此下次运行时将再次返回相同的消息。

1。为什么服务器没有收到应答?

2。如何优雅地关闭订阅者?

3。.cancel()的预期行为是什么?

Update (v2.4.0+)

客户端版本2.4.0增加了一个新的可选参数await_msg_callbacks到流pull future的cancel()方法。如果设置为True,该方法将阻塞,直到所有当前执行的消息回调完成,后台消息流已经关闭(默认为False)。

try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel(await_msg_callbacks=True)  # blocks until done

几个发布说明:

  • 等待回调意味着在它们中生成的任何消息ack仍将被处理(读取:发送到后端)。
  • 如果await_msg_callbacksFalse或未给定,则不等待关机。在cancel()返回后,回调可能仍然在后台运行,但是它们生成的任何ACK都不会产生影响,因为不再有任何线程运行将ACK请求发送到后端。
  • 位于客户端内部队列中的消息现在在关机时自动NACK-ed。无论await_msg_callbacks的值如何,都会发生这种情况。

原始答案(v2.3.0及以下版本)

流拉在后台由流拉管理器管理。当流pull future被取消时,它调用管理器的close()方法,优雅地关闭后台助手线程。

被关闭的东西之一是调度器——它是一个线程池,用于异步地将接收到的消息分派给用户回调。需要注意的关键是scheduler.shutdown()执行而不是等待用户回调完成,因为它可能会"永远"阻塞,而是清空执行器的工作队列并关闭后者:

def shutdown(self):
"""Shuts down the scheduler and immediately end all pending callbacks.
"""
# Drop all pending item from the executor. Without this, the executor
# will block until all pending items are complete, which is
# undesirable.
try:
while True:
self._executor._work_queue.get(block=False)
except queue.Empty:
pass
self._executor.shutdown()

这解释了为什么在提供的代码示例中没有发送ack—回调休眠30秒,而流pull future仅在大约10秒后被取消。ack没有发送到服务器。

混杂。评论

  • 由于流式拉取是一个长时间运行的操作,我们希望在主线程中阻塞,以免过早退出。这是通过阻塞流拉未来结果来完成的:
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()

或在预设超时后:

try:
streaming_pull_future.result(timeout=123)
except concurrent.futures.TimeoutError:
streaming_pull_future.cancel()
  • ACK请求是尽力而为的。即使关闭阻塞并等待用户回调完成,仍然不能保证消息实际上会被识别(例如,请求可能会在网络中丢失)。

  • Re:关于重新发送消息的关注("所以下次运行相同的消息再次返回">) -这实际上是设计的。后端将尽力至少交付每条消息一次,因为请求可能会丢失。这包括来自订阅者的ACK请求,因此订阅者应用程序的设计必须考虑到幂等性。

最新更新