如何在通过AWS Athena查询结果进行分页时跳过标题



我有一个Angular 6应用程序,它从AWS Lambda请求数据。数据本身存储在Glue数据库中,并通过AWS Athena进行查询。AWS Glue数据库设置了skip.header.line.count=1选项,当我在控制台中运行Athena查询时,我会得到一个没有头的响应。当我尝试使用boto3检索数据时,就会出现问题。我有一个函数,它运行一个查询,然后对结果进行分页:

def run_query_paged(self, query, page_token=None, page_size=10):
"""
Run query.
"""
request = self.athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': self.database
},
ResultConfiguration={
'OutputLocation': self.s3_output,
}
)
execution_id = request['QueryExecutionId']
if execution_id:
while True:
stats = self.athena_client.get_query_execution(QueryExecutionId=execution_id)
status = stats['QueryExecution']['Status']['State']
if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(0.2)  # 200ms
if status == 'SUCCEEDED':
paginator = self.athena_client.get_paginator('get_query_results')
pagination_config = {
'MaxItems': page_size,
'PageSize': page_size,
}
if page_token:
pagination_config['StartingToken'] = page_token
response_iterator = paginator.paginate(
QueryExecutionId=execution_id,
PaginationConfig=pagination_config
)
for page in response_iterator:
next_token = page.get('NextToken', '')
results = page
break
return {
'rows': process_results(results),
'nextToken': next_token
}
if status == 'FAILED':
raise Exception(stats['QueryExecution']['Status']['StateChangeReason'])
return None

process_results函数将响应转换为考虑列类型的列表:

def process_results(response):
"""
Processes the result of get_query_results function
"""
rows = response['ResultSet']['Rows']
meta = response['ResultSet']['ResultSetMetadata']['ColumnInfo']
result = []
for row in rows:
parsed_row = {}
for idx, val in enumerate(row['Data']):
field = val
column_info = meta[idx]
if 'VarCharValue' in val:
value = val['VarCharValue']
else:
value = ''
parsed_row[column_info['Name']] = process_row_value(value, column_info)
result.append(parsed_row)
return result

问题是,分页响应的第一页的标题的列名如下:

{
"foo": "foo",
"bar": "bar"
},
{
"foo": 1,
"bar": 2
},
...

而所有其他页面都没有。当我从客户端应用程序请求第一页时,我会得到一个标题加9行(页面大小为10(,当我使用NextToken请求下一页时,会得到10行,没有标题。在第一页中显示9个项目,在所有后续页面中显示10个项目,这是非常尴尬的。

如何跳过标题对结果进行分页?

我没有找到任何跳过标头的选项,并通过请求第一个请求的page_size + 1结果,然后请求其余的page_size来破解它。

def _build_response(self, execution_id: str, starting_token: Optional[str], page_size: int) -> AthenaPagedResult:
"""
Returns the query result for the provided page as well as a token to the next page if there are more
results to retrieve for the query.
"""
paginator = self.athena_client.get_paginator('get_query_results')
# The first page of response contains header. Increase the page size for a first page and then
# remove header so that all the pages would have the same size.
if starting_token:
skip_header = False
else:
page_size += 1
skip_header = True
max_items = page_size * 2
pagination_config = {
'MaxItems': min(max_items, MAXIMUM_ALLOWED_ITEMS_NUMBER),
'PageSize': min(page_size, MAXIMUM_ALLOWED_ITEMS_NUMBER)
}
if starting_token:
pagination_config['StartingToken'] = starting_token
response_iterator = paginator.paginate(QueryExecutionId=execution_id, PaginationConfig=pagination_config)

iterator_index = 0
results = EMPTY_ATHENA_RESPONSE
next_token = None
# Retrieve only a single page and return the next token for the caller to iterate the response.
for page in response_iterator:
if iterator_index > 0:
if len(page['ResultSet']['Rows']) == 0:
next_token = None
break
next_token = page.get('NextToken')
results = page
iterator_index += 1
# ... process and return results

图我会添加我的。我将其分为三个部分——启动查询,在查询结果页面中分页,并将结果标准化为列表/字典列表:

import boto3
import logging
from time import sleep
def query_athena_table(sql_query, database, **kwargs):
client = boto3.client('athena')
query_started = client.start_query_execution(
QueryString=sql_query,
QueryExecutionContext={'Database': database},
ResultConfiguration={"OutputLocation": f"s3://your-specific-athena-query-results-bucket"}
)
timeout_value = kwargs.get("timeout", 15) * 1000 # bc its in milliseconds
finished = False
logging.info("Started Athena Query")
while not finished:
query_in_flight = client.get_query_execution(QueryExecutionId=query_started["QueryExecutionId"])
query_status = query_in_flight["QueryExecution"]["Status"]["State"]
if query_status == 'SUCCEEDED':
finished = True
elif query_status in ['FAILED', 'CANCELLED']:
logging.error(query_in_flight['QueryExecution']['Status']['StateChangeReason'])
return None
elif timeout_value < ez_get(query_in_flight, "QueryExecution", "Statistics", "TotalExecutionTimeInMillis"):
logging.warning(f"Query timed out with no response (timeout val: {timeout_value})")
return None
else:
sleep(kwargs.get("wait_interval", 0.1))
return paginate_athena_response(client, query_started["QueryExecutionId"], **kwargs)
# about 4s per 10k rows, with a floor of ~0.33s if only one page
def paginate_athena_response(client, execution_id: str, **kwargs):
paginator = client.get_paginator('get_query_results')
response_iterator = paginator.paginate(
QueryExecutionId=execution_id, 
PaginationConfig={
'MaxItems': kwargs.get("max_results", 100000),
'PageSize': 1000,
'StartingToken': kwargs.get("pagination_starting_token", None),
})
results = []
# Iterate through pages. The NextToken logic is handled for you.
for n, page in enumerate(response_iterator):
logging.info(f"Now on page {n}, rows on this page: {len(page['ResultSet']['Rows'])}")
if n > 0 and len(page['ResultSet']['Rows']) == 0: # probably redundant
break
results += standardize_athena_query_result(page, **kwargs)
kwargs["headers"] = list(results[0].keys()) # prevent parser from .pop(0) after 1st page
return results
def standardize_athena_query_result(results, **kwargs):
results = [x["Data"] for x in results['ResultSet']['Rows']]
for n, row in enumerate(results):
results[n] = [x['VarCharValue'] for x in row]
if kwargs.get("output_lod"):
headers = kwargs.get("headers") or results.pop(0)
output_lod = []
for n, result_row in enumerate(results):
output_lod.append({headers[i]:result_row[i] for i in range(0, len(result_row))})
return output_lod
return results

最新更新