Python中的自定义负载类,用于Apache Hudi和Pyspark中的precombine和combineAndG



我们正在将我们的代码库从spark-java迁移到PySpark。我们使用preCombine()和combineAndGetUpdateValue()处理合并数据的自定义聚合,并在Spark-Java代码中实现了这一点。在下面的例子:

package com.paytm.sparkjobs.utils.hudi;

public class MergeMdrPayloadAndPersist extends BaseAvroPayload implements HoodieRecordPayload<MergeMdrPayloadAndPersist> {
public static final Logger logger = LoggerFactory.getLogger(MergeMdrPayloadAndPersist.class);
private GenericRecord record = null;
public MergeMdrPayloadAndPersist(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
this.record = record;
}
@Override
public MergeMdrPayloadAndPersist preCombine(MergeMdrPayloadAndPersist mergeMdrPayloadAndPersist) {
//custom logic for aggregations
return new MergeMdrPayloadAndPersist(mergeMdrPayloadAndPersist.record, mergeMdrPayloadAndPersist.orderingVal);
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord indexedRecord, Schema schema) throws IOException {
//custom logic for aggregations
MergeMdrPayloadAndPersist mergedDoc = new MergeMdrPayloadAndPersist(inputPayload.record, inputPayload.orderingVal);
return mergedDoc.getInsertValue(schema);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
if (this.recordBytes.length == 0) {
return Option.empty();
} else {
IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(this.recordBytes, schema);
return this.isDeleteRecord((GenericRecord)indexedRecord) ? Option.empty() : Option.of(indexedRecord);
}
}
private boolean isDeleteRecord(GenericRecord genericRecord) {
Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
return deleteMarker instanceof Boolean && (Boolean)deleteMarker;
}
}

我能知道我们如何在python中编写自定义有效负载类/函数来处理我们的聚合和合并逻辑吗?一些代码示例会有所帮助。

没有办法用pyspark实现这一点,Hudi没有自己的python API,它使用spark python API与基于py4j的java/scala类交互,并且您不能使用py4j创建java类,因为java类需要在编译java代码之前创建。

最好的方法是创建一个包含您的类的小java jar,并将其添加到pyspark shell/submit中。

最新更新