无法将Athena查询读取到pandas数据帧中



我有下面的代码,希望它能正确地返回数据帧。轮询逻辑起作用,但数据帧似乎没有被创建/返回。现在,它在调用时只返回None。

import boto3
import pandas as pd
import io
import re
import time
AK='mykey'
SAK='mysecret'
params = {
'region': 'us-west-2',
'database': 'default',
'bucket': 'my-bucket',
'path': 'dailyreport',
'query': 'SELECT * FROM v_daily_report LIMIT 100'
}
session = boto3.Session(aws_access_key_id=AK,aws_secret_access_key=SAK)

# In[32]:

def athena_query(client, params):
response = client.start_query_execution(
QueryString=params["query"],
QueryExecutionContext={
'Database': params['database']
},
ResultConfiguration={
'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
}
)
return response

def athena_to_s3(session, params, max_execution = 5):
client = session.client('athena', region_name=params["region"])
execution = athena_query(client, params)
execution_id = execution['QueryExecutionId']
df = poll_status(execution_id, client)
return df
def poll_status(_id, client):
'''
poll query status
'''
result = client.get_query_execution(
QueryExecutionId = _id
)
state = result['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
print(state)
print(str(result))
s3_key = 's3://' + params['bucket'] + '/' + params['path']+'/'+ _id + '.csv'
print(s3_key)
df = pd.read_csv(s3_key)
return df
elif state == 'QUEUED':
print(state)
print(str(result))
time.sleep(1)
poll_status(_id, client)
elif state == 'RUNNING':
print(state)
print(str(result))
time.sleep(1)
poll_status(_id, client)
elif state == 'FAILED':
return result
else:
print(state)
raise Exception

df_data = athena_to_s3(session, params)
print(df_data)

我计划将数据帧加载从轮询函数中移出,但只是想让它现在就这样工作。

我建议您查看AWS Wrangler,而不是使用传统的boto3 Athena API。这个更新、更具体的接口可以访问AWS中的所有数据,包括对Athena的查询,并提供更多功能。

import awswrangler as wr
df = wr.pandas.read_sql_query(
sql="select * from table",
database="database"
)

感谢@RagePown的评论,值得检查PyAthena,作为查询Athena的boto3选项的替代方案。

如果返回None,则是因为state=="FAILED"。您需要调查它失败的原因,该原因可能在"StateChangeReason"中。

{
'QueryExecution': {
'QueryExecutionId': 'string',
'Query': 'string',
'StatementType': 'DDL'|'DML'|'UTILITY',
'ResultConfiguration': {
'OutputLocation': 'string',
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_S3'|'SSE_KMS'|'CSE_KMS',
'KmsKey': 'string'
}
},
'QueryExecutionContext': {
'Database': 'string'
},
'Status': {
'State': 'QUEUED'|'RUNNING'|'SUCCEEDED'|'FAILED'|'CANCELLED',
'StateChangeReason': 'string',
'SubmissionDateTime': datetime(2015, 1, 1),
'CompletionDateTime': datetime(2015, 1, 1)
},
'Statistics': {
'EngineExecutionTimeInMillis': 123,
'DataScannedInBytes': 123,
'DataManifestLocation': 'string',
'TotalExecutionTimeInMillis': 123,
'QueryQueueTimeInMillis': 123,
'QueryPlanningTimeInMillis': 123,
'ServiceProcessingTimeInMillis': 123
},
'WorkGroup': 'string'
}
}

只是为了详细说明RagePon使用PyAthena的答案——这也是我最终所做的。由于某种原因,AwsWrangler让我窒息,无法处理从S3返回的JSON。以下是基于PyAthena的PyPi页面为我工作的代码片段

import os
from pyathena import connect
from pyathena.util import as_pandas

aws_access_key_id = os.getenv('ATHENA_ACCESS_KEY')
aws_secret_access_key = os.getenv('ATHENA_SECRET_KEY')
region_name = os.getenv('ATHENA_REGION_NAME')
staging_bucket_dir = os.getenv('ATHENA_STAGING_BUCKET')
cursor = connect(aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
s3_staging_dir=staging_bucket_dir,
).cursor()
cursor.execute(sql)
df = as_pandas(cursor)

以上假设您已将以下内容定义为环境变量:

  • ATHENA_ACCESS_KEY:您的AWS帐户的AWS访问密钥id
  • ATHENA_SECRET_KEY:AWS密钥
  • ATHENA_REGION_NAME:AWS地区名称
  • ATHENA_STAING_BUCKET:同一帐户中具有正确访问设置的存储桶(其解释不在此回答范围内(

相关内容

最新更新