接收消息 ID(未来)是否表示消息已在 PubSub 中发送?



我正在使用回调函数向Google pubsub发送消息,该函数从将来读回消息ID。使用以下代码:

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
import time
from google.cloud import pubsub_v1
# ...
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
def get_callback(f, data):
def callback(f):
try:
print(f.result())
except:  # noqa
print('Please handle {} for {}.'.format(f.exception(), data))
return callback
for i in range(10):
data = str('message')
# When you publish a message, the client returns a future.
future = publisher.publish(
topic_path, data=data.encode('utf-8')  # data must be a bytestring.
)
# Publish failures shall be handled in the callback function.
future.add_done_callback(get_callback(future, data))
print('Published message with error handler.')

我成功收到消息 ID 并出现错误/异常,但是我发现某些消息未读入 pubsub(从 GCP 控制台查看它们时(。

消息 ID 打印在回调函数内的行print(f.result())中。

我的问题是:假设消息在收到消息 ID 后成功发送到 Pubsub 是否安全?

如果是这样,"丢弃"消息的原因可能是什么?

如果发布已成功返回消息 ID,那么是的,Cloud Pub/Sub 保证将消息传递给订阅者。如果您没有看到该消息,则有几种情况可能导致此问题:

  1. 发布消息时,订阅不存在。如果订阅是在消息发布之前创建的,则 Cloud Pub/Sub 仅向订阅者传递消息。
  2. 订阅
  3. 的另一个订阅者已在运行并收到消息。如果您在订阅者启动并运行时使用 GCP 控制台获取消息,则订阅者可能收到了消息。
  4. 如果您在控制台上收到过一次消息,然后重新加载以再次获取消息,则在确认截止时间(默认为 10 秒(过后,您可能不会再次看到该消息。
  5. 单个拉取请求(GCP 控制台的"查看消息"功能(可能不足以检索消息。如果您多次单击它或启动基本订阅者,您可能会看到该消息。

最新更新