我有一个基于这样的日期分区的雅典娜表:
20190218
我想删除去年创建的所有分区。
我尝试了以下查询,但它不起作用。
ALTER TABLE tblname DROP PARTITION (partition1 < '20181231');
ALTER TABLE tblname DROP PARTITION (partition1 > '20181010'), Partition (partition1 < '20181231');
根据https://docs.aws.amazon.com/athena/latest/latest/alter-table-table-drop-partition.html,ALTER TABLE tblname DROP PARTITION
采用分区规格,因此没有范围被允许。
在Presto中,您会做DELETE FROM tblname WHERE ...
,但DELETE
也不支持雅典娜。
由于这些原因,您需要确实利用一些外部解决方案。
例如:
- 将文件列出如https://stackoverflow.com/a/488824373/65458
- 删除文件并包含目录
- 更新分区信息(https://docs.aws.amazon.com/athena/latest/latest/msck-repair-table.html应该有用)
虽然目前雅典娜SQL可能不支持它,但胶水API调用GetPartitions
(Athena在引擎盖下用于查询)支持复杂的滤波器表达式,类似于您可以在A中写的内容SQL WHERE
表达式。
而不是通过雅典娜删除分区,您可以使用胶API进行GetPartitions
,然后使用BatchDeletePartition
。
这是脚本做的theo推荐。
import json
import logging
import awswrangler as wr
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(level=logging.INFO, format=logging.BASIC_FORMAT)
logger = logging.getLogger()
def delete_partitions(database_name: str, table_name: str):
client = boto3.client('glue')
paginator = client.get_paginator('get_partitions')
page_count = 0
partition_count = 0
for page in paginator.paginate(DatabaseName=database_name, TableName=table_name, MaxResults=20):
page_count = page_count + 1
partitions = page['Partitions']
partitions_to_delete = []
for partition in partitions:
partition_count = partition_count + 1
partitions_to_delete.append({'Values': partition['Values']})
logger.info(f"Found partition {partition['Values']}")
if partitions_to_delete:
response = client.batch_delete_partition(DatabaseName=database_name, TableName=table_name,
PartitionsToDelete=partitions_to_delete)
logger.info(f'Deleted partitions with response: {response}')
else:
logger.info('Done with all partitions')
def repair_table(database_name: str, table_name: str):
client = boto3.client('athena')
try:
response = client.start_query_execution(QueryString='MSCK REPAIR TABLE ' + table_name + ';',
QueryExecutionContext={'Database': database_name}, )
except ClientError as err:
logger.info(err.response['Error']['Message'])
else:
res = wr.athena.wait_query(query_execution_id=response['QueryExecutionId'])
logger.info(f"Query succeeded: {json.dumps(res, indent=2)}")
if __name__ == '__main__':
table = 'table_name'
database = 'database_name'
delete_partitions(database_name=database, table_name=table)
repair_table(database_name=database, table_name=table)
发布Java的胶API解决方法,以节省一些需要的时间:
public void deleteMetadataTablePartition(String catalog,
String db,
String table,
String expression) {
GetPartitionsRequest getPartitionsRequest = new GetPartitionsRequest()
.withCatalogId(catalog)
.withDatabaseName(db)
.withTableName(table)
.withExpression(expression);
List<PartitionValueList> partitionsToDelete = new ArrayList<>();
do {
GetPartitionsResult getPartitionsResult = this.glue.getPartitions(getPartitionsRequest);
List<PartitionValueList> partitionsValues = getPartitionsResult.getPartitions()
.parallelStream()
.map(p -> new PartitionValueList().withValues(p.getValues()))
.collect(Collectors.toList());
partitionsToDelete.addAll(partitionsValues);
getPartitionsRequest.setNextToken(getPartitionsResult.getNextToken());
} while (getPartitionsRequest.getNextToken() != null);
Lists.partition(partitionsToDelete, 25)
.parallelStream()
.forEach(partitionValueList -> {
glue.batchDeletePartition(
new BatchDeletePartitionRequest()
.withCatalogId(catalog)
.withDatabaseName(db)
.withTableName(table)
.withPartitionsToDelete(partitionValueList));
});
}