我正在使用回调函数向Google pubsub发送消息,该函数从将来读回消息ID。使用以下代码:
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
import time
from google.cloud import pubsub_v1
# ...
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
def get_callback(f, data):
def callback(f):
try:
print(f.result())
except: # noqa
print('Please handle {} for {}.'.format(f.exception(), data))
return callback
for i in range(10):
data = str('message')
# When you publish a message, the client returns a future.
future = publisher.publish(
topic_path, data=data.encode('utf-8') # data must be a bytestring.
)
# Publish failures shall be handled in the callback function.
future.add_done_callback(get_callback(future, data))
print('Published message with error handler.')
我成功收到消息 ID 并出现错误/异常,但是我发现某些消息未读入 pubsub(从 GCP 控制台查看它们时(。
消息 ID 打印在回调函数内的行print(f.result())
中。
我的问题是:假设消息在收到消息 ID 后成功发送到 Pubsub 是否安全?
如果是这样,"丢弃"消息的原因可能是什么?
如果发布已成功返回消息 ID,那么是的,Cloud Pub/Sub 保证将消息传递给订阅者。如果您没有看到该消息,则有几种情况可能导致此问题:
- 发布消息时,订阅不存在。如果订阅是在消息发布之前创建的,则 Cloud Pub/Sub 仅向订阅者传递消息。 订阅
- 的另一个订阅者已在运行并收到消息。如果您在订阅者启动并运行时使用 GCP 控制台获取消息,则订阅者可能收到了消息。
- 如果您在控制台上收到过一次消息,然后重新加载以再次获取消息,则在确认截止时间(默认为 10 秒(过后,您可能不会再次看到该消息。
- 单个拉取请求(GCP 控制台的"查看消息"功能(可能不足以检索消息。如果您多次单击它或启动基本订阅者,您可能会看到该消息。