如何使用Spark截断数据并从蜂巢表中删除所有分区



如何使用 Spark 2.3.0

删除所有数据并从 Hive表中删除所有分区
truncate table my_table; // Deletes all data, but keeps partitions in metastore
alter table my_table drop partition(p_col > 0) // does not work from spark

对我有用的唯一一件事是通过show partitions my_table迭代,用,替换/并分别删除每个分区。但是必须有一种更干净的方式。如果分区列是string类型,它甚至不起作用。有任何建议吗?

让我们使用Spark 2.4.3:

设置问题
// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")
// Enable dynamic partitioning 
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))
// Verify inserts
spark.table("potato").count // 9 records

我们使用外部目录的listPartitionsdropPartitions功能。

// Get External Catalog
val catalog = spark.sharedState.externalCatalog
// Get the spec from the list of all partitions 
val partitions = catalog.listPartitions("default", "potato").map(_.spec)
// We pass them to the Catalog's dropPartitions function.
// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", partitions,
                   ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 0 records
catalog.listPartitions("default", "potato").length // 0 partitions

这一切都很好,适合MANAGED表,但是EXTERNAL表呢?

// We repeat the setup above but after creating an EXTERNAL table
// After dropping we see that the partitions appear to be gone (or are they?).
catalog.listPartitions("default", "potato").length // 0 partitions
// BUT repairing the table simply adds them again, the partitions/data 
// were NOT deleted from the underlying filesystem. This is not what we wanted!
spark.sql("MSCK REPAIR TABLE potato")
catalog.listPartitions("default", "potato").length // 9 partitions again!   

为此,我们在放下分区之前将表从EXTERNAL更改为MANAGED

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
// Identify the table in question
val identifier = TableIdentifier("potato", Some("default"))
// Get its current metadata
val tableMetadata = catalog.getTableMetadata(identifier)
// Clone the metadata while changing the tableType to MANAGED
val alteredMetadata = tableMetadata.copy(tableType = CatalogTableType.MANAGED)
// Alter the table using the new metadata
catalog.alterTable(alteredMetadata)
// Now drop!
catalog.dropPartitions("default", "potato", partitions,
                   ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 0 records
catalog.listPartitions("default", "potato").length // 0 partitions
spark.sql("MSCK REPAIR TABLE potato") // Won't add anything
catalog.listPartitions("default", "potato").length // Still 0 partitions!

不要忘记使用CatalogTableType.EXTERNAL

将表更改回EXTERNAL

Hive具有两种类型的表(托管表和外部表格(。为了管理整个模式和数据,创建托管表是为了实现hive的目的。因此,放下蜂巢式托管桌子丢弃模式,元数据&数据。但是,外部表的数据位于其他地方(例如,S3之类的外部来源(。因此,放下表仅丢弃元数据和表格,但数据在源中仍然完好无损。

在您的情况下,当您截断了表时,Hive应该保持账面,因为在Hive中仍然存在表,仅删除了数据。另外,Metastore不持有数据,因为它仅包含有关模式和其他相关表详细信息的信息。

我希望它能在某种程度上回答。

edit1:

类似的职位

最新更新