我将一些初始批量数据推入hudi表,然后每天向其中写入增量数据。但是如果返回的数据到达,那么表中已经存在的最新的预合并字段将被忽略,并且到达的预合并字段(较旧的)将覆盖它。
我用以下配置编写一个包含以下数据的数据帧:
+---+-----+-------------+
| id| req|dms_timestamp|
+---+-----+-------------+
| 1| one| 2022-12-17|
| 2| two| 2022-12-17|
| 3|three| 2022-12-17|
+---+-----+-------------+
"className"-> "org.apache.hudi",
"hoodie.datasource.write.precombine.field"-> "dms_timestamp",
"hoodie.datasource.write.recordkey.field"-> "id",
"hoodie.table.name"-> "hudi_test",
"hoodie.consistency.check.enabled"-> "false",
"hoodie.datasource.write.reconcile.schema"-> "true",
"path"-> basePath,
"hoodie.datasource.write.keygenerator.class"-> "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.partitionpath.field"-> "",
"hoodie.datasource.write.hive_style_partitioning"-> "true",
"hoodie.upsert.shuffle.parallelism"-> "1",
"hoodie.datasource.write.operation"-> "upsert",
"hoodie.cleaner.policy"-> "KEEP_LATEST_COMMITS",
"hoodie.cleaner.commits.retained"-> "5",
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |id |req |dms_timestamp|
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|20221214130513893 |20221214130513893_0_0|id:3 | |005674e6-a581-419a-b8c7-b2282986bc52-0_0-36-34_20221214130513893.parquet|3 |three|2022-12-17 |
|20221214130513893 |20221214130513893_0_1|id:1 | |005674e6-a581-419a-b8c7-b2282986bc52-0_0-36-34_20221214130513893.parquet|1 |one |2022-12-17 |
|20221214130513893 |20221214130513893_0_2|id:2 | |005674e6-a581-419a-b8c7-b2282986bc52-0_0-36-34_20221214130513893.parquet|2 |two |2022-12-17 |
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
然后在下一次运行中,我输入了以下数据:
+---+----+-------------+
| id| req|dms_timestamp|
+---+----+-------------+
| 1|null| 2019-01-01|
+---+----+-------------+
"hoodie.table.name"-> "hudi_test",
"hoodie.datasource.write.recordkey.field" -> "id",
"hoodie.datasource.write.precombine.field" -> "dms_timestamp",
// get_common_config
"className"-> "org.apache.hudi",
"hoodie.datasource.hive_sync.use_jdbc"-> "false",
"hoodie.consistency.check.enabled"-> "false",
"hoodie.datasource.write.reconcile.schema"-> "true",
"path"-> basePath,
// get_partitionDataConfig -- no partitionfield
"hoodie.datasource.write.keygenerator.class"-> "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.partitionpath.field"-> "",
"hoodie.datasource.write.hive_style_partitioning"-> "true",
// get_incrementalWriteConfig
"hoodie.upsert.shuffle.parallelism"-> "1",
"hoodie.datasource.write.operation"-> "upsert",
"hoodie.cleaner.policy"-> "KEEP_LATEST_COMMITS",
"hoodie.cleaner.commits.retained"-> "5",
和获取这个表:
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |id |req |dms_timestamp|
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|20221214131440563 |20221214131440563_0_0|id:3 | |37dee403-6077-4a01-bf28-7afd65ef390a-0_0-18-21_20221214131555500.parquet|3 |three|2022-12-17 |
|20221214131555500 |20221214131555500_0_1|id:1 | |37dee403-6077-4a01-bf28-7afd65ef390a-0_0-18-21_20221214131555500.parquet|1 |null |2019-01-01 |
|20221214131440563 |20221214131440563_0_2|id:2 | |37dee403-6077-4a01-bf28-7afd65ef390a-0_0-18-21_20221214131555500.parquet|2 |two |2022-12-17 |
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
不应该发生这种情况,因为这是流中较晚到达的过期数据。如何处理这种情况?
默认情况下,Hudi使用org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
作为有效负载类,通过这个类,Hudi使用预合并字段来删除传入数据(预合并步骤),然后用新记录覆盖现有记录,而不比较预合并字段的值。
如果您希望始终保持最近更新的记录,您需要添加以下配置:
"hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.model.DefaultHoodieRecordPayload"