有一个分区的 Hive 表
CREATE EXTERNAL TABLE IF NOT EXISTS CUSTOMER_PART (
NAME string ,
AGE int ,
YEAR INT)
PARTITIONED BY (CUSTOMER_ID decimal(15,0))
STORED AS PARQUET LOCATION 'HDFS LOCATION'
第一个加载是通过 PYSPARK 从 ORACLE 到 HIVE 完成
的,使用INSERT OVERWRITE TABLE CUSTOMER_PART PARTITION (CUSTOMER_ID) SELECT NAME, AGE, YEAR, CUSTOMER_ID FROM CUSTOMER;
这工作正常,并在运行期间动态创建分区。现在,每天增量加载数据都会为分区下的单个记录创建单独的文件。
INSERT INTO TABLE CUSTOMER_PART PARTITION (CUSTOMER_ID = 3) SELECT NAME, AGE, YEAR FROM CUSTOMER WHERE CUSTOMER_ID = 3; --Assume this gives me the latest record in the database
是否有可能将值附加到分区下的现有 parquet 文件,直到它达到块大小,而无需为每个插入创建较小的文件。
重写整个分区是一种选择,但我不想这样做
INSERT OVERWRITE TABLE CUSTOMER_PART PARTITION (CUSTOMER_ID = 3) SELECT NAME, AGE, YEAR FROM CUSTOMER WHERE CUSTOMER_ID = 3;
为配置单元设置了以下属性
set hive.execution.engine=tez; -- TEZ execution engine
set hive.merge.tezfiles=true; -- Notifying that merge step is required
set hive.merge.smallfiles.avgsize=128000000; --128MB
set hive.merge.size.per.task=128000000; -- 128MB
这仍然对日常插入没有帮助。任何可以遵循的替代方法都将非常有帮助。
据我所知,我们无法存储每日分区数据的单个文件,因为数据将由每天分区的不同部分文件存储。
由于您提到您正在从 Oracle 数据库导入数据,因此您每次都可以从 Oracle DB 导入整个数据并覆盖到 HDFS 中。通过这种方式,您可以维护单个零件文件。
此外,不建议将HDFS用于少量数据。
在这种情况下,我可以想到以下方法:
方法1:
重新创建 Hive 表,即在将增量数据加载到表中后CUSTOMER_PART
。
-
使用
CUSTOMER_PART
表数据的完整快照创建temp_CUSTOMER_PART
表。 -
运行覆盖最终表
CUSTOMER_PART
从temp_CUSTOMER_PART
表中进行选择 -
在这种情况下,您将拥有没有小文件的最终表。
注意:您需要 确保在创建临时表后没有新数据插入到表中
CUSTOMER_PART
。
方法2:
通过使用 input_file_name() 函数:
检查每个分区中有多少个不同的文件名,然后仅选择每个分区中具有
10..etc
个以上文件的分区。使用这些分区创建
temporary table
,并仅overwrite the final table
选定的分区。
注意:您需要确保在创建临时表后没有新数据插入
CUSTOMER_PART
表中,因为我们将覆盖最终表。
方法3:
Hive(不是 Spark)提供覆盖并选择相同的表 .i.e
insert overwrite table default.t1 partition(partiton_column)
select * from default.t1; //overwrite and select from same t1 table
如果您遵循这种方式,那么一旦您的火花作业完成,就需要
hive job triggered
。Hive 将在
running overwrite/select
同一表时获取锁,因此如果任何正在写入表的作业将等待。
此外:Orc format
将提供连接,它将合并小的ORC文件以创建新的大文件。
alter table <db_name>.<orc_table_name> [partition_column="val"] concatenate;