我正忙于使用 Google Cloud 使用 PubSub,我已经实现了 Google PubSub 订阅者,并且堆栈按预期工作,但是我有时会遇到一个问题,即我的回调发生超时,然后订阅者线程似乎挂起,如下所示:
DEBUG 2019-08-14 09:49:13,130 google.cloud.pubsub_v1.subscriber.policy.thread.dispatch_callback:287- Handling 1 batched requests
DEBUG 2019-08-14 09:49:15,441 google.cloud.pubsub_v1.subscriber.policy.base.maintain_leases:347- The current p99 value is 10 seconds.
DEBUG 2019-08-14 09:49:15,442 google.cloud.pubsub_v1.subscriber.policy.base.maintain_leases:397- Snoozing lease management for 2.925854 seconds.
ERROR 2019-08-14 09:49:16,315 models.subscribe:104- Subscriber Timeout occurred Timed out waiting for result.
这是特定的错误:
Timed out waiting for result.
这在谷歌云文档中有描述 这里.
我的实际订阅者代码和异常处理如下:
protocol = self.get_subscriber_protocol()
logger.info("Starting subscriber %s on topic %s", os.environ.get('PUBSUB_CLIENT_ID'), topic)
future = protocol.subscribe(topic,
callback=callback,
always_raise=False,
create_topic=True,
exception_handler=exception_handler)
try:
future.result(timeout=120)
except TimeoutError as te:
logger.error("Subscriber Timeout occurred {}".format(te))
except Exception as e:
logger.error(e)
据我了解,如果发生超时,则订阅者应记录一条消息并丢弃该消息,但它似乎会阻止线程。
我只是想知道是否有人经历过这种情况,处理此案的最佳方法是什么?
谢谢!
当你调用protocol.subscribe()
时,你是否检索了subscribe()
在 Pub/Sub Python 客户端库中返回的未来?如果是这样,则不应在将来超时的情况下调用result()
。https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/client.html#google.cloud.pubsub_v1.subscriber.client.Client.subscribe