如何保存雅典娜会导致lambda的Python词典



在使用python 3.6?

import boto3
import logging
from retry import retry
from typing import List, Dict, Any
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
class FailedExecution(Exception):
    def __init__(self, message: str) -> None:
        self.message = message
        log.exception(self.message)
        super().__init__(self.message)
class ExecutionNotReady(Exception):
    def __init__(self, message: str) -> None:
        self.message = message
        log.warning(self.message)
        super().__init__(self.message)
class AthenaCall(object):
    def __init__(self, region: str, database: str, s3_output: str, workgroup: str = 'primary')-> None:
        self.region = region
        self.database = database
        self.s3_output = s3_output
        self.workgroup = workgroup
        self.client = boto3.client('athena', region_name=self.region)
    
    @retry(ExecutionNotReady, tries=10, delay=2)
    def _get_query_result(self, query_execution_id: str, next_token: str)-> List[Dict[str, Any]]:
        query_status = self.client.get_query_execution(
        QueryExecutionId = query_execution_id
        )
        state = query_status['QueryExecution']['Status']['State']
        if state == 'SUCCEEDED':
            if next_token:
                response = self.client.get_query_results(
                    QueryExecutionId = query_execution_id,
                    NextToken = next_token,
                    MaxResults = 1000
                )
            else:
                response = self.client.get_query_results(
                        QueryExecutionId = query_execution_id,
                        MaxResults = 1000
                    )
            return response
        elif state == 'FAILED':
            raise FailedExecution(query_status['QueryExecution']['Status']['StateChangeReason'])
        else:
            raise ExecutionNotReady('QueryExecution {} is not ready yet.'.format(query_execution_id))
    
    @staticmethod
    def _get_data(row: Dict[str, Any])-> List[str]:
        result = []
        for key in row["Data"]:
            result += key.values() or [None]
        return result
    
    def _create_dict_from_athena_result(self, response: Dict[str, Any], _key_result: List[str])-> List[Dict[str, Any]]:
        while response["ResultSet"]["Rows"]:
            result = {_k: None for _k in _key_result} 
            row = response["ResultSet"]["Rows"].pop(0)
            _values = self._get_data(row)
            for idx, _value in enumerate(_values):
                result[_key_result[idx]] = _value
            yield result
    def _get_query_response(self, query_execution_id: str)-> List[Dict[str, Any]]:
        i = 1
        log.info(f"Getting page {i}")
        response = self._get_query_result(query_execution_id, None)
        _key_row = response["ResultSet"]["Rows"].pop(0)
        _keys = self._get_data(_key_row)
        data = []
        while "NextToken" in response:
            i+=1
            log.info(f"Getting page {i}")
            data += list(self._create_dict_from_athena_result(response, _keys))
            response = self._get_query_result(query_execution_id, response["NextToken"])
        data += list(self._create_dict_from_athena_result(response, _keys))
        log.info(f"Total data: {len(data)}")
        return data
    
    def run_query(self, query: str)-> List[Dict[str, Any]]:
        log.info(f'Running query: {query}')
        query_execution = self.client.start_query_execution(
            QueryString = query,
            QueryExecutionContext = {
                'Database': self.database
            },
            ResultConfiguration = {
                'OutputLocation': self.s3_output,
            },
            WorkGroup = self.workgroup
        )
        response = self._get_query_response(query_execution["QueryExecutionId"])
        return response

最新更新