Hudi覆盖有过期数据的表



我将一些初始批量数据推入hudi表,然后每天向其中写入增量数据。但是如果返回的数据到达,那么表中已经存在的最新的预合并字段将被忽略,并且到达的预合并字段(较旧的)将覆盖它。

我用以下配置编写一个包含以下数据的数据帧:

+---+-----+-------------+
| id|  req|dms_timestamp|
+---+-----+-------------+
|  1|  one|   2022-12-17|
|  2|  two|   2022-12-17|
|  3|three|   2022-12-17|
+---+-----+-------------+
"className"-> "org.apache.hudi",
"hoodie.datasource.write.precombine.field"-> "dms_timestamp",
"hoodie.datasource.write.recordkey.field"-> "id",
"hoodie.table.name"-> "hudi_test",
"hoodie.consistency.check.enabled"-> "false",
"hoodie.datasource.write.reconcile.schema"-> "true",
"path"-> basePath,
"hoodie.datasource.write.keygenerator.class"-> "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.partitionpath.field"-> "",
"hoodie.datasource.write.hive_style_partitioning"-> "true",
"hoodie.upsert.shuffle.parallelism"-> "1",
"hoodie.datasource.write.operation"-> "upsert",
"hoodie.cleaner.policy"-> "KEEP_LATEST_COMMITS",
"hoodie.cleaner.commits.retained"-> "5",
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                       |id |req  |dms_timestamp|
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|20221214130513893  |20221214130513893_0_0|id:3              |                      |005674e6-a581-419a-b8c7-b2282986bc52-0_0-36-34_20221214130513893.parquet|3  |three|2022-12-17   |
|20221214130513893  |20221214130513893_0_1|id:1              |                      |005674e6-a581-419a-b8c7-b2282986bc52-0_0-36-34_20221214130513893.parquet|1  |one  |2022-12-17   |
|20221214130513893  |20221214130513893_0_2|id:2              |                      |005674e6-a581-419a-b8c7-b2282986bc52-0_0-36-34_20221214130513893.parquet|2  |two  |2022-12-17   |
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+

然后在下一次运行中,我输入了以下数据:

+---+----+-------------+
| id| req|dms_timestamp|
+---+----+-------------+
|  1|null|   2019-01-01|
+---+----+-------------+
"hoodie.table.name"-> "hudi_test",
"hoodie.datasource.write.recordkey.field" -> "id",
"hoodie.datasource.write.precombine.field" -> "dms_timestamp",
// get_common_config
"className"-> "org.apache.hudi",
"hoodie.datasource.hive_sync.use_jdbc"-> "false",
"hoodie.consistency.check.enabled"-> "false",
"hoodie.datasource.write.reconcile.schema"-> "true",
"path"-> basePath,
// get_partitionDataConfig -- no partitionfield
"hoodie.datasource.write.keygenerator.class"-> "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.partitionpath.field"-> "",
"hoodie.datasource.write.hive_style_partitioning"-> "true",
// get_incrementalWriteConfig
"hoodie.upsert.shuffle.parallelism"-> "1",
"hoodie.datasource.write.operation"-> "upsert",
"hoodie.cleaner.policy"-> "KEEP_LATEST_COMMITS",
"hoodie.cleaner.commits.retained"-> "5",

和获取这个表:

+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                       |id |req  |dms_timestamp|
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|20221214131440563  |20221214131440563_0_0|id:3              |                      |37dee403-6077-4a01-bf28-7afd65ef390a-0_0-18-21_20221214131555500.parquet|3  |three|2022-12-17   |
|20221214131555500  |20221214131555500_0_1|id:1              |                      |37dee403-6077-4a01-bf28-7afd65ef390a-0_0-18-21_20221214131555500.parquet|1  |null |2019-01-01   |
|20221214131440563  |20221214131440563_0_2|id:2              |                      |37dee403-6077-4a01-bf28-7afd65ef390a-0_0-18-21_20221214131555500.parquet|2  |two  |2022-12-17   |
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+

不应该发生这种情况,因为这是流中较晚到达的过期数据。如何处理这种情况?

默认情况下,Hudi使用org.apache.hudi.common.model.OverwriteWithLatestAvroPayload作为有效负载类,通过这个类,Hudi使用预合并字段来删除传入数据(预合并步骤),然后用新记录覆盖现有记录,而不比较预合并字段的值。

如果您希望始终保持最近更新的记录,您需要添加以下配置:

"hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.model.DefaultHoodieRecordPayload"

最新更新