Google PubSub返回Google .gax.errors.GaxError: GaxError RPC fai



我们正在尝试在一个分布式系统上的事件发生后对现有主题进行简单的发布。

代码如下:

try:
  dat = data.encode('utf-8')
  topic.publish(dat)
except:
  <code to recover>

如果我们用except语句捕获所有的跟踪信息,并输出跟踪信息,我们得到:

google.gax.errors。GaxError: GaxError(RPC失败,由以(StatusCode.)结束的RPC的集合。不可用,{"创建":"@1478711654.067744009"、"描述":"安全阅读失败"、"文件":"src/核心/lib/安全/运输/secure_endpoint.c"、"file_line":157年,"grpc_status referenced_errors":14日:[{"创建":"@1478711654.067706801"、"描述":"EOF"、"文件":"src/核心/lib/iomgr/tcp_posix.c"、"file_line":235}]})>

(下面是完整的错误)

查看http://gcloud-python.readthedocs.io/en/latest/pubsub-topic.html#google.cloud.pubsub.topic.Topic.publish,似乎这个GAX错误不是我们应该寻找的东西。然而,如果我们确实捕获错误并使用指数回退来重试,这通常会在第二次工作。

我发现了这个讨论,虽然它谈到了_gax_python的潜在错误,但它似乎并不相关。你觉得我们做错了什么吗?

完全错误:

458    Traceback (most recent call last):
   459      File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
   460        self.run()
   461      File "/usr/lib/python3.5/threading.py", line 862, in run
   462        self._target(*self._args, **self._kwargs)
   463      File "/home/pp/pp/pp/process/uploader.py", line 145, in upload_thread
   464        topic.publish(byte_string)
   465      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/google/cloud/pubsub/topic.py", line 257, in publish
   466        message_ids = api.topic_publish(self.full_name, [message_data])
   467      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/google/cloud/pubsub/_gax.py", line 165, in topic_publish
   468        options=options)
   469      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/google/cloud/gapic/pubsub/v1/publisher_api.py", line 289, in publish
   470        return self._publish(request, options)
   471      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/google/gax/api_callable.py", line 481, in inner
   472        return api_caller(api_call, this_settings, request)
   473      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/google/gax/api_callable.py", line 158, in inner
   474        return a_func(request, **kwargs)
   475      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/google/gax/api_callable.py", line 434, in inner
   476        errors.create_error('RPC failed', cause=exception))
   477      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/future/utils/__init__.py", line 419, in raise_with_traceback
   478        raise exc.with_traceback(traceback)
   479      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/google/gax/api_callable.py", line 430, in inner
   480        return a_func(*args, **kwargs)
   481      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/google/gax/api_callable.py", line 64, in inner
   482        return a_func(*updated_args, **kwargs)
   483      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/grpc/_channel.py", line 481, in __call__
   484        return _end_unary_response_blocking(state, False, deadline)
   485      File "/home/pp/.virtualenvs/cv/lib/python3.5/site-packages/grpc/_channel.py", line 432, in _end_unary_response_blocking
   486        raise _Rendezvous(state, None, None, deadline)
   487    google.gax.errors.GaxError: GaxError(RPC failed, caused by <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, {"created":"@1478711654.067744009","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1478711654.067706801","description":"EOF","file":"src/core/lib/iomgr/tcp_posix.c","file_line":235}]})>

看起来您正在寻找的相关讨论是问题2683,"频繁的gRPC状态代码。不可用的错误"。

您没有做错任何事情,并且捕获异常并重试似乎是目前最合适的解决方案。

如果主题是一个全局变量,它将停止产生错误。让主题成为一个类变量,并且只实例化一次——只调用这一行一次:

topic = pubsub.Client().topic(name)

而且,这似乎只适用于Python 2.7 -在Python 3.6中重试会稍微麻痹疼痛。

在Python 3.6中禁用gRPC可以达到这个目的——这可以通过设置环境变量来实现:

ENV GOOGLE_CLOUD_DISABLE_GRPC=true

我设法得到一个"不那么漂亮"的解决方案。在google.cloud.pubsub_v1.subscriber.policy.thread.Policy.on_exception.

使用复制deadline_exceeded代码的策略
from google.cloud.pubsub_v1.subscriber.policy.thread import Policy
import grpc
class UnavailableHackPolicy(Policy):
    def on_exception(self, exception):
        """
        There is issue on grpc channel that launch an UNAVAILABLE exception now and then. Until
        that issue is fixed we need to protect our consumer thread from broke.
        https://github.com/GoogleCloudPlatform/google-cloud-python/issues/2683
        """
        unavailable = grpc.StatusCode.UNAVAILABLE
        if getattr(exception, 'code', lambda: None)() in [unavailable]:
            print("¡OrbitalHack! - {}".format(exception))
            return
        return super(UnavailableHackPolicy, self).on_exception(exception)

在接收消息功能上,我有一个像

这样的代码
subscriber = pubsub.SubscriberClient(policy_class=UnavailableHackPolicy)
subscription_path = subscriber.subscription_path(project, subscription_name)
subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)

问题是,当资源真的不可用时,我们将不知道。然而,当GRPC开发团队设法解决这个问题时,我们将使用这个解决方案。