使用AWS athena查询和Boto3创建数据框架



我使用AWS Athena从S3查询原始数据。由于雅典娜将查询输出写入S3输出桶,我使用Lambda函数获取数据,这是雅典娜查询结果到数据框:

我代码:

def athena_query_to_dataframe(db, s3Bucket, query):

import boto3
import pandas as pd

client = boto3.client('athena')
listOfStatus = ['SUCCEEDED', 'FAILED', 'CANCELLED']
listOfInitialStatus = ['RUNNING', 'QUEUED']

print('Starting Query Execution:')

tempS3Path = 's3://{}'.format(s3Bucket)

response = client.start_query_execution(
QueryString = query,
QueryExecutionContext = {
'Database': db
},
ResultConfiguration = {
'OutputLocation': tempS3Path,
}
)
queryExecutionId = response['QueryExecutionId']
status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']
while status in listOfInitialStatus:
status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']
if status in listOfStatus:
if status == 'SUCCEEDED':
print('Query Succeeded!')
paginator = client.get_paginator('get_query_results')
query_results = paginator.paginate(
QueryExecutionId = queryExecutionId,
PaginationConfig = {'PageSize': 1000}
)
elif status == 'FAILED':
print('Query Failed!')
elif status == 'CANCELLED':
print('Query Cancelled!')
break

results = []
rows = []

print('Processing Response')

for page in query_results:
for row in page['ResultSet']['Rows']:
rows.append(row['Data'])
columns = rows[0]
rows = rows[1:]
columns_list = []
for column in columns:
columns_list.append(column['VarCharValue'])

print('Creating Dataframe')
dataframe = pd.DataFrame(columns = columns_list)
for row in rows:
df_row = []
try:
for data in row:
df_row.append(data['VarCharValue'])
dataframe.loc[len(dataframe)] = df_row
except:
pass

当我试图返回df.shape我只得到(0,20)这意味着df没有被行更新

我正在寻找下面的输出:

  1. 修复上述问题,以获得行填充以及
  2. 如果有更好的方法来获取数据帧

最简单的答案是使用awswrangler

import awswrangler as wr  
df = wr.athena.read_sql_query(sql="SELECT * FROM my_athena_table", database="my_database")

用您的值替换my_athena_tablemy_database

还可以使用pandas内置函数pd.read_sql(),因为awswrangler需要多个权限才能执行。

import pandas as pd
df = pd.read_sql("select * from db_name.table_name")

最新更新