使用wr.s3.to_parquet
时,我可以使用格式化字符串文本构建路径,并使用该模式拥有现有文件夹。
def SaveInS3_test(Ticker, Granularity, Bucket, df, keyPrefix=""):
year, month, day = datetime.utcnow().strftime("%Y/%m/%d/%H").split("/")[0:3]
path = (
f"s3://{Bucket}/{keyPrefix}{year}/{month}/{day}/{Ticker}/{Granularity}.parquet"
)
print(path)
wr.s3.to_parquet(df, path, index=True, dataset=True, mode="append")
df=pd.DataFrame({'col': [1, 2, 3]})
SaveInS3_test("GBP", "H1","my_bucket", df, keyPrefix="Test/")
路径将是这样的:
s3://my_bucket/Test/2022/08/06/GBP/H1.parquet
我想使用牧者的Athena/Glue数据库功能如下(这项工作(:
wr.s3.to_parquet(
df=df,
path=f's3://my_bucket',
dataset=True,
database='default', # Athena/Glue database
table='my_table') # Athena/Glue table
我是否可以使用我的F-字符串方法以某种方式使用此数据库功能的路径结构?:
s3://my_bucket/Test/2022/08/06/GBP/H1.parquet
我不确定我将如何使用分区或类似的功能来实现这一点。
我使用路径的任何尝试都会返回InvalidArgumentValue
,因为它与现有的Glue目录表路径不匹配。
解决方案
def save_in_s3(df, ticker, granularity, bucket, prefix):
dt = datetime.utcnow()
# Important! Create new partition cols
df = df.assign(**{
'ticker': ticker, 'granularity': granularity,
'year': dt.year, 'month': dt.month, 'day': dt.day
})
# Write to s3
wr.s3.to_parquet(
df,
path=f's3://{bucket}/{prefix}',
table='table_name',
database='database_name',
index=False,
dataset=True,
partition_cols=['year', 'month', 'day', 'ticker', 'granularity']
)
已制定的示例
假设我们在两个不同的日期用不同的参数调用函数:
# Called on 2022-08-06
save_in_s3(df, ticker='GBP', granularity='H1', bucket='my_bucket', prefix='my_folder')
# Called on 2022-08-07
save_in_s3(df, ticker='INR', granularity='H2', bucket='my_bucket', prefix='my_folder')
现在,假设您已经在glue中创建了数据库,上面的函数调用将在s3中输出以下文件:
s3://my_bucket/my_folder/year=2022/month=8/day=6/ticker=GBP/granularity=H1/*.parquet
s3://my_bucket/my_folder/year=2022/month=8/day=7/ticker=INR/granularity=H2/*.parquet
相应的athena表将具有以下分区:
SHOW PARTITIONS "database_name"."table_name";
-- year=2022/month=8/day=6/ticker=GBP/granularity=H1
-- year=2022/month=8/day=7/ticker=INR/granularity=H2
从本质上讲,分区和s3文件夹结构将由wrapper自动为您创建。