AWS雅典娜:删除日期范围之间的分区



我有一个基于这样的日期分区的雅典娜表:

20190218

我想删除去年创建的所有分区。

我尝试了以下查询,但它不起作用。

ALTER TABLE tblname DROP PARTITION (partition1 < '20181231');
ALTER TABLE tblname DROP PARTITION (partition1 > '20181010'), Partition (partition1 < '20181231');

根据https://docs.aws.amazon.com/athena/latest/latest/alter-table-table-drop-partition.html,ALTER TABLE tblname DROP PARTITION采用分区规格,因此没有范围被允许。

在Presto中,您会做DELETE FROM tblname WHERE ...,但DELETE也不支持雅典娜。

由于这些原因,您需要确实利用一些外部解决方案。

例如:

  1. 将文件列出如https://stackoverflow.com/a/488824373/65458
  2. 删除文件并包含目录
  3. 更新分区信息(https://docs.aws.amazon.com/athena/latest/latest/msck-repair-table.html应该有用)

虽然目前雅典娜SQL可能不支持它,但胶水API调用GetPartitions(Athena在引擎盖下用于查询)支持复杂的滤波器表达式,类似于您可以在A中写的内容SQL WHERE表达式。

而不是通过雅典娜删除分区,您可以使用胶API进行GetPartitions,然后使用BatchDeletePartition

这是脚本做的theo推荐。

import json
import logging
import awswrangler as wr
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(level=logging.INFO, format=logging.BASIC_FORMAT)
logger = logging.getLogger()

def delete_partitions(database_name: str, table_name: str):
  client = boto3.client('glue')
  paginator = client.get_paginator('get_partitions')
  page_count = 0
  partition_count = 0
  for page in paginator.paginate(DatabaseName=database_name, TableName=table_name, MaxResults=20):
    page_count = page_count + 1
    partitions = page['Partitions']
    partitions_to_delete = []
    for partition in partitions:
      partition_count = partition_count + 1
      partitions_to_delete.append({'Values': partition['Values']})
      logger.info(f"Found partition {partition['Values']}")
    if partitions_to_delete:
      response = client.batch_delete_partition(DatabaseName=database_name, TableName=table_name,
        PartitionsToDelete=partitions_to_delete)
      logger.info(f'Deleted partitions with response: {response}')
    else:
      logger.info('Done with all partitions')

def repair_table(database_name: str, table_name: str):
  client = boto3.client('athena')
  try:
    response = client.start_query_execution(QueryString='MSCK REPAIR TABLE ' + table_name + ';',
      QueryExecutionContext={'Database': database_name}, )
  except ClientError as err:
    logger.info(err.response['Error']['Message'])
  else:
    res = wr.athena.wait_query(query_execution_id=response['QueryExecutionId'])
    logger.info(f"Query succeeded: {json.dumps(res, indent=2)}")

if __name__ == '__main__':
  table = 'table_name'
  database = 'database_name'
  delete_partitions(database_name=database, table_name=table)
  repair_table(database_name=database, table_name=table)

发布Java的胶API解决方法,以节省一些需要的时间:

public void deleteMetadataTablePartition(String catalog,
                                         String db,
                                         String table,
                                         String expression) {
    GetPartitionsRequest getPartitionsRequest = new GetPartitionsRequest()
            .withCatalogId(catalog)
            .withDatabaseName(db)
            .withTableName(table)
            .withExpression(expression);
    List<PartitionValueList> partitionsToDelete = new ArrayList<>();
    do {
        GetPartitionsResult getPartitionsResult = this.glue.getPartitions(getPartitionsRequest);
        List<PartitionValueList> partitionsValues = getPartitionsResult.getPartitions()
                .parallelStream()
                .map(p -> new PartitionValueList().withValues(p.getValues()))
                .collect(Collectors.toList());
        partitionsToDelete.addAll(partitionsValues);
        getPartitionsRequest.setNextToken(getPartitionsResult.getNextToken());
    } while (getPartitionsRequest.getNextToken() != null);
    Lists.partition(partitionsToDelete, 25)
            .parallelStream()
            .forEach(partitionValueList -> {
                glue.batchDeletePartition(
                        new BatchDeletePartitionRequest()
                                .withCatalogId(catalog)
                                .withDatabaseName(db)
                                .withTableName(table)
                                .withPartitionsToDelete(partitionValueList));
            });
}

相关内容

  • 没有找到相关文章

最新更新