我试图捕获以下例外,但没有运气。
ERROR 2019-03-04 16:31:50,522 _plugin_wrapping 15 140108141426432 AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7f6dd8330b70>" raised exception!
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/grpc/_plugin_wrapping.py", line 79, in __call__
callback_state, callback))
File "/usr/local/lib/python3.6/site-packages/google/auth/transport/grpc.py", line 77, in __call__
callback(self._get_authorization_headers(context), None)
File "/usr/local/lib/python3.6/site-packages/google/auth/transport/grpc.py", line 65, in _get_authorization_headers
headers)
File "/usr/local/lib/python3.6/site-packages/google/auth/credentials.py", line 122, in before_request
self.refresh(request)
File "/usr/local/lib/python3.6/site-packages/google/oauth2/service_account.py", line 322, in refresh
request, self._token_uri, assertion)
File "/usr/local/lib/python3.6/site-packages/google/oauth2/_client.py", line 145, in jwt_grant
response_data = _token_endpoint_request(request, token_uri, body)
File "/usr/local/lib/python3.6/site-packages/google/oauth2/_client.py", line 111, in _token_endpoint_request
_handle_error_response(response_body)
File "/usr/local/lib/python3.6/site-packages/google/oauth2/_client.py", line 61, in _handle_error_response
error_details, response_body)
google.auth.exceptions.RefreshError: ('invalid_grant: Invalid JWT Signature.', '{n "error": "invalid_grant",n "error_description": "Invalid JWT Signature."n}')
我正在使用Google Cloud PubSub_v1,而这部分是订户。我想捕获此例外,以便客户可以再次请求凭据。请求我们制作的凭证API工作正常。为了获得此错误,我使用Google Cloud Console UI手动从SA中删除了键,并评论了启动功能调用以获得凭据。因此,它试图使用旧的。
在代码中,我认为我有一个非常基本的订户。
from google.cloud import pubsub_v1
from google.oauth2 import service_account
from google.api_core.exceptions import NotFound
from google.auth.exceptions import RefreshError
...
def startSubscribe(self):
project, subscription, gauth = self.decryptCreds()
credentials = service_account.Credentials.from_service_account_info(gauth)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
subscription_path = subscriber.subscription_path(project, subscription)
self.future = subscriber.subscribe(subscription_path, callback=self.callback)
LOGGER.info('Listening for messages on {}'.format(subscription_path))
while True:
try:
LOGGER.info("Calling future.result()")
self.future.result()
self.future.exception()
# TODO: Need to figure out permission error, etc... and handle them properly. This is just a catch all until
# I figure out more about gcp subscribe and futures
except RefreshError as e:
# Catch permission exception and get creds again
LOGGER.info("Caught the correct error")
LOGGER.error(e, exc_info=True)
except Exception as e:
LOGGER.info("Must be something else")
LOGGER.error(e, exc_info=True)
LOGGER.info("Looping")
starterubscribe函数也包裹在try/除外,但我也没有出现错误。
有人有任何经验或知道如何捕获此例外或有任何见识如何检查凭据以便我要求新的凭据?
谢谢。
我们联系了Google,他们报告说异步(使用期货)版本中有一个错误。
所以我修改了我们的代码以同步进行。
subscriber = None
while self.alive:
try:
if subscriber is None:
project, subscription, gauth = self.decryptCreds()
credentials = (service_account.Credentials
.from_service_account_info(gauth))
subscriber = (pubsub_v1
.SubscriberClient(credentials=credentials))
subscription_path = (subscriber
.subscription_path(project,
subscription))
pull_response = subscriber.pull(subscription_path,
max_messages=NUM_MESSAGES,
timeout=60,
retry=None)
for msg in pull_response.received_messages:
msg_id = msg.message.message_id
try:
payload = json.loads(
base64.b64decode(msg.message.data.decode('utf-8'))
.decode('utf-8'))
LOGGER.info("Got Message")
LOGGER.debug(payload)
LOGGER.debug(type(payload))
# Removed for privacy...
except Exception as e:
LOGGER.info(e, exc_info=True)
# Here we are ack-ing the message no matter what
# because if the message itself is bad, there is no way
# to tell the publisher and we do not want bad messages
# continually sent.
subscriber.acknowledge(subscription_path, [msg.ack_id])
except NotFound as nf:
LOGGER.error(nf, exc_info=True)
try:
# If we get here try to get the credentials before remaking
# the subscriber
self.cpps.doGetCreds()
# Close the channel first to make sure not file handlers
# left open
try:
subscriber.api.transport._channel.close()
except Exception as e:
LOGGER.error(e, exc_info=True)
subscriber = None
except Exception as e:
LOGGER.error(
'Exception here may have been crashing firmware.')
LOGGER.error(e, exc_info=True)
except (RefreshError, ServiceUnavailable) as re:
LOGGER.error(re, exc_info=True)
except RetryError as e:
LOGGER.error(e, exc_info=True)
except Exception as e:
LOGGER.error(e, exc_info=True)
time.sleep(10)
LOGGER.info("Exiting Cloud Pull")
try:
subscriber.api.transport._channel.close()
except Exception as e:
LOGGER.error(e, exc_info=True)
在这里要注意的重要一件事,因为文档中未描述的是subscriber.api.transport._channel.close()
。否则,它将留下大量的文件处理。
基本上只是切换到同步拉动并确保关闭通道。
我希望对@progirlxoxo
有帮助哦,相关的Google进口是
from google.cloud import pubsub_v1
from google.oauth2 import service_account
from google.auth.exceptions import RefreshError
from google.api_core.exceptions import RetryError, ServiceUnavailable, NotFound