在使用python 3.6?
import boto3
import logging
from retry import retry
from typing import List, Dict, Any
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
class FailedExecution(Exception):
def __init__(self, message: str) -> None:
self.message = message
log.exception(self.message)
super().__init__(self.message)
class ExecutionNotReady(Exception):
def __init__(self, message: str) -> None:
self.message = message
log.warning(self.message)
super().__init__(self.message)
class AthenaCall(object):
def __init__(self, region: str, database: str, s3_output: str, workgroup: str = 'primary')-> None:
self.region = region
self.database = database
self.s3_output = s3_output
self.workgroup = workgroup
self.client = boto3.client('athena', region_name=self.region)
@retry(ExecutionNotReady, tries=10, delay=2)
def _get_query_result(self, query_execution_id: str, next_token: str)-> List[Dict[str, Any]]:
query_status = self.client.get_query_execution(
QueryExecutionId = query_execution_id
)
state = query_status['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
if next_token:
response = self.client.get_query_results(
QueryExecutionId = query_execution_id,
NextToken = next_token,
MaxResults = 1000
)
else:
response = self.client.get_query_results(
QueryExecutionId = query_execution_id,
MaxResults = 1000
)
return response
elif state == 'FAILED':
raise FailedExecution(query_status['QueryExecution']['Status']['StateChangeReason'])
else:
raise ExecutionNotReady('QueryExecution {} is not ready yet.'.format(query_execution_id))
@staticmethod
def _get_data(row: Dict[str, Any])-> List[str]:
result = []
for key in row["Data"]:
result += key.values() or [None]
return result
def _create_dict_from_athena_result(self, response: Dict[str, Any], _key_result: List[str])-> List[Dict[str, Any]]:
while response["ResultSet"]["Rows"]:
result = {_k: None for _k in _key_result}
row = response["ResultSet"]["Rows"].pop(0)
_values = self._get_data(row)
for idx, _value in enumerate(_values):
result[_key_result[idx]] = _value
yield result
def _get_query_response(self, query_execution_id: str)-> List[Dict[str, Any]]:
i = 1
log.info(f"Getting page {i}")
response = self._get_query_result(query_execution_id, None)
_key_row = response["ResultSet"]["Rows"].pop(0)
_keys = self._get_data(_key_row)
data = []
while "NextToken" in response:
i+=1
log.info(f"Getting page {i}")
data += list(self._create_dict_from_athena_result(response, _keys))
response = self._get_query_result(query_execution_id, response["NextToken"])
data += list(self._create_dict_from_athena_result(response, _keys))
log.info(f"Total data: {len(data)}")
return data
def run_query(self, query: str)-> List[Dict[str, Any]]:
log.info(f'Running query: {query}')
query_execution = self.client.start_query_execution(
QueryString = query,
QueryExecutionContext = {
'Database': self.database
},
ResultConfiguration = {
'OutputLocation': self.s3_output,
},
WorkGroup = self.workgroup
)
response = self._get_query_response(query_execution["QueryExecutionId"])
return response