如何使用 python 为分析引擎请求生成 IAM 访问令牌



分析引擎的文档提供了一个链接,用于使用 CLI 生成 IAM 访问令牌,但我需要使用 API 调用生成令牌。 这是 CLI 方法:

bx api https://api.ng.bluemix.net
bx login
<enter your credentials>
<If you are part of multiple IBM Cloud accounts, you'll be asked to choose an account for the current session. Also, you'll need to choose an organization and space in IBM Cloud.>
bx iam oauth-tokens

该文档还指出Cloud Foundry API已被弃用?如何生成 IAM 访问令牌?

这是我

最后创建的代码...

一些用于日志记录和异常的实用程序类:

import requests
import json
from datetime import datetime, timedelta
import logging
import os 
class Logger:
    def __init__(self):
        format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        logging.basicConfig(format=format)
        self.ch = logging.StreamHandler()
    def get_logger(self, clazz):
        logger = logging.getLogger(clazz)
        logger.setLevel(os.getenv("LOG_LEVEL", logging.INFO))
        return logger
class CloudFoundryException(Exception):
    def __init__(self, message, *args):
        self.message = message
        super(CloudFoundryException, self).__init__(message, *args) 

然后一个类来做主要工作:

class CloudFoundryAPI(object):
    def __init__(self, api_key=None, api_key_filename=None, api_endpoint='https://api.ng.bluemix.net', provision_poll_timeout_mins=30):
        self.log = Logger().get_logger(self.__class__.__name__)
        self.provision_poll_timeout_mins = provision_poll_timeout_mins 
        assert api_key is not None or api_key_filename is not None, "You must provide a value for api_key or for api_key_filename"
        # allow tests to override the api_key_filename parameter
        if hasattr(CloudFoundryAPI, 'api_key_filename') and CloudFoundryAPI is not None:
            api_key_filename = CloudFoundryAPI.api_key_filename
        if api_key_filename is not None:
            try:
                with open(api_key_filename, 'r') as api_file:
                    d = json.load(api_file)
                    try:
                        self.api_key = d['apikey']
                    except KeyError:
                        # The attibute name used to be
                        self.api_key = d['apiKey']
            except:
                self.log.error('Error retrieving "apiKey" from file {}'.format(api_key_filename))
                raise
        else:
            self.api_key = api_key
        self.api_endpoint = api_endpoint
        self.info = self._get_info()
    def auth(self):
        self.log.debug('Authenticating to CloudFoundry')
        url = self.info['authorization_endpoint'] + '/oauth/token'
        headers = { 
                    'Content-Type': 'application/x-www-form-urlencoded;charset=utf-8',
                    'Accept': 'application/x-www-form-urlencoded;charset=utf-8', 
                    'Authorization': 'Basic Y2Y6'
                }
        data = 'grant_type=password&username=apikey&password={}'.format(self.api_key)
        try:
            response = requests.post(url, headers=headers, data=data)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            self.log.error('Cloud Foundry Auth Response: ' + response.text)
            # TODO we should define a custom application exception for this
            raise
        self.auth_token = response.json()
        self.expires_at = datetime.now() + timedelta(seconds=self.auth_token['expires_in']/60)
        self.log.debug('Authenticated to CloudFoundry')
    def oidc_token(self):
        self.log.debug('Retrieving IAM token')
        url='https://iam.bluemix.net/identity/token'
        data="grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey={}".format(self.api_key)
        try:
            response = requests.post(url, data=data)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            self.log.debug('IAM token response: ' + response.text)
            raise
        self.oidc_token = response.json()
        self.oidc_expires_at = datetime.now() + timedelta(seconds=self.oidc_token['expires_in']/60)
        self.log.debug('Retrieved IAM token')
        return self.oidc_token
    def get_auth_token(self):
        if not hasattr(self, 'auth_token') or not hasattr(self, 'expires_at') or datetime.now() > self.expires_at:
            self.auth()
        return self.auth_token
    def get_oidc_token(self):
        if not hasattr(self, 'oidc_token') or not hasattr(self, 'oidc_expires_at') or datetime.now() > self.oidc_expires_at:
            self.oidc_token()
        return self.oidc_token
    def _request_headers(self):
        auth_token = self.get_auth_token()
        access_token = auth_token['access_token']
        token_type = auth_token['token_type']
        headers = {
            'accept': 'application/json',
            'authorization': '{} {}'.format(token_type, access_token),
            'cache-control': 'no-cache', 
            'content-type': 'application/json'
            }
        return headers
    def _request(self, url, http_method='get', data=None, description='', create_auth_headers=True):
        if create_auth_headers:
            headers = self._request_headers()
        else:
            headers = {}
        try:
            if http_method == 'get':
                response = requests.get(url, headers=headers)
            elif http_method == 'post':
                response = requests.post(url, headers=headers, data=json.dumps(data))
            elif http_method == 'delete':
                response = requests.delete(url, headers=headers)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            self.log.error('{} : {} {} : {} {}'.format(description, http_method, url, response.status_code, response.text))
            raise CloudFoundryException(message=response.text)
        try:
            self.log.debug('{} : {} {} : {} {}'.format(description, http_method, url, response.status_code, json.dumps(response.json())))
        except ValueError:
            self.log.debug('{} : {} {} : {} {}'.format(description, http_method, url, response.status_code, response.text))
        return response
    def _get_info(self):
        url = '{}/v2/info'.format(self.api_endpoint)
        response = self._request(url=url, http_method='get', description='_get_info', create_auth_headers=False)
        return response.json()

然后,您可以像这样使用它:

cf = CloudFoundryAPI(api_key="xxxx") # or pass api_key_filename
cf.get_auth_token() # get UAA token
cf.get_oidc_token() # get IAM token

最新更新