我使用AWS Athena从S3查询原始数据。由于雅典娜将查询输出写入S3输出桶,我使用Lambda函数获取数据,这是雅典娜查询结果到数据框:
我代码:
def athena_query_to_dataframe(db, s3Bucket, query):
import boto3
import pandas as pd
client = boto3.client('athena')
listOfStatus = ['SUCCEEDED', 'FAILED', 'CANCELLED']
listOfInitialStatus = ['RUNNING', 'QUEUED']
print('Starting Query Execution:')
tempS3Path = 's3://{}'.format(s3Bucket)
response = client.start_query_execution(
QueryString = query,
QueryExecutionContext = {
'Database': db
},
ResultConfiguration = {
'OutputLocation': tempS3Path,
}
)
queryExecutionId = response['QueryExecutionId']
status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']
while status in listOfInitialStatus:
status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']
if status in listOfStatus:
if status == 'SUCCEEDED':
print('Query Succeeded!')
paginator = client.get_paginator('get_query_results')
query_results = paginator.paginate(
QueryExecutionId = queryExecutionId,
PaginationConfig = {'PageSize': 1000}
)
elif status == 'FAILED':
print('Query Failed!')
elif status == 'CANCELLED':
print('Query Cancelled!')
break
results = []
rows = []
print('Processing Response')
for page in query_results:
for row in page['ResultSet']['Rows']:
rows.append(row['Data'])
columns = rows[0]
rows = rows[1:]
columns_list = []
for column in columns:
columns_list.append(column['VarCharValue'])
print('Creating Dataframe')
dataframe = pd.DataFrame(columns = columns_list)
for row in rows:
df_row = []
try:
for data in row:
df_row.append(data['VarCharValue'])
dataframe.loc[len(dataframe)] = df_row
except:
pass
当我试图返回df.shape我只得到(0,20)这意味着df没有被行更新
我正在寻找下面的输出:
- 修复上述问题,以获得行填充以及 如果有更好的方法来获取数据帧
最简单的答案是使用awswrangler
。
import awswrangler as wr
df = wr.athena.read_sql_query(sql="SELECT * FROM my_athena_table", database="my_database")
用您的值替换my_athena_table
和my_database
。
还可以使用pandas内置函数pd.read_sql(),因为awswrangler需要多个权限才能执行。
import pandas as pd
df = pd.read_sql("select * from db_name.table_name")