从 Spark 中删除分区



我正在使用Java-Spark(Spark 2.2.0(。

我正在尝试删除 Hive 分区,如下所示:

spark.sql(""ALTER TABLE backup DROP PARTITION (date < '20180910')"

并得到以下异常:

org.apache.spark.sql.catalyst.parser.ParseException: 不匹配的输入"<"期望 {'(', ','}(第 1 行,位置 42(

我知道这是未解决的问题 ALTER TABLE DROP PARTITION 应该支持应该在我的版本中修复的比较器,但我仍然有例外。

从 Spark 中删除分区的替代方案是什么?还有另一种实现可以做到这一点吗?

谢谢。

似乎暂时没有办法做到这一点。 如 SPARK-14922 所示,此修复的目标版本为 3.0.0,并且仍在进行中。

因此,我认为有两种可能的解决方法。

让我们使用 Spark 2.4.3 设置问题:

// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")
// Enable dynamic partitioning 
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))
// Verify inserts
spark.table("potato").count // 9 records

现在。。。尝试从内部删除单个分区会起作用!

spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour='2020-06-07T01')""")
spark.table("potato").count // 8 records

尝试删除多个分区不起作用。

spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")""")
org.apache.spark.sql.catalyst.parser.ParseException:
Found duplicate keys 'hour'.(line 1, pos 34)
== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")
----------------------------------^^^

使用比较运算符删除一系列分区也不起作用。

spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")""")
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<=' expecting {')', ','}(line 1, pos 49)
== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")
-------------------------------------------------^^^

据说发生这种情况是因为分区列是一个字符串,我们正在使用比较运算符。

我找到的解决方案是:

  1. 获取分区列表并有条件地过滤它们。
  2. 要么
  3. 逐个删除单个分区,要么将它们作为[Map[String,String]序列(TablePartitionSpec(传递给目录的dropPartitions函数。

第 1 步:

// Get External Catalog
val catalog = spark.sharedState.externalCatalog
// Get the spec from the list of partitions 
val partitions = catalog.listPartitions("default", "potato").map(_.spec)
// Filter according to the condition you attempted.
val filteredPartitions = partitions.flatten.filter(_._2 <= "2020-06-07T03")
.map(t => Map(t._1 -> t._2))

第 2 步:

我们将每个参数元组传递给单独的 ALTER TABLE DROP PARTITION 语句。

filteredPartitions.flatten.foreach(t => 
spark.sql(s"""ALTER TABLE potato DROP IF EXISTS PARTITION (${t._1}="${t._2}")"""))
spark.table("potato").count // 6 records

或者将它们传递给目录的dropPartition函数。

// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", filteredPartitions,
ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 6 records

我希望这是有帮助的。让我知道您是否有更好的Spark 2.x解决方案。

你可以用 Spark 编程做同样的事情。此外,它也没有固定在 Spark 2、2.1 和 2.2 中以供参考 https://issues.apache.org/jira/browse/SPARK-14922

Steps 
1 . Create hive context 
2 . Get the table for getTable method from the hive context and you need to pass dbName, tableName and a boolean value if any error
3 . From table Object hive.getPartitions(table) you can get the partitions from hive context (you need to decide which partitions you are going delete )
4 . You can remove partitions using dropPartition with partition values , table name and db info (hive.dropPartition) 
hiveContext.getPartitions(table)
hiveContext.dropPartition(dbName, tableName, partition.getValues(), true)

You need to validate the partition name and check whether it needs to be deleted or not (you need to write custom method ).
Or you can get the partition list sql using show partitions and from there also you can use drop partition to remove it.
This may give you some pointers .

Pyspark People 的解决方案

  1. 获取表的所有分区。

  2. 将隐藏分区列转换为分区列表。

  3. 清理分区以仅获取值。

  4. 具有所需条件的筛选器列表。

  5. 对所有筛选列表执行全部更改表操作。 请在下面找到 pyspark 格式的代码

    partitions = spark.sql("SHOW PARTITIONS potato")
    listpartitions = list(partitions.select('partition').toPandas()['partition'])
    cleanpartitions = [ i.split('=')[1] for i in listpartitions]
    filtered = [i for i in cleanpartitions if i < str(20180910)]
    for i in filtered:
    spark.sql("alter table potato DROP IF EXISTS PARTITION (date = '"+i+"')")
    

我认为这里的问题是你使用符号'<' (lessthen)如此不知不觉,你的数据必须是数字或日期类型形式,但你把它放在''意味着它采用字符串格式的值。我建议您必须检查分区的格式。也许您必须以正确的日期格式进行投射。

为什么不遵循最简单的方法呢?

alter table t drop if exists partition(x=1),partition(x=2),partition(x=3)

使用一种方法拉取要删除的所有分区并将其传递到字符串中,并将字符串作为命令传递 Initial_string="如果存在,则更改表 t 删除" Initial_string+分区(partition_name=分区(+分区(partition_name=分区2(

最新更新