我是Apache beam的新手,我正在使用Apache光束,并在GCP中使用Dataflow作为运行器。我在执行管道时遇到以下错误。
coder of type class org.apache.beam.sdk.coders.ListCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element [Person [businessDay=01042020, departmentId=101, endTime=2020-04-01T09:06:02.000Z, companyId=242, startTime=2020-04-01T09:00:33.000Z], Person [businessDay=01042020, departmentId=101, endTime=2020-04-01T09:07:47.000Z, companyId=242, startTime=2020-04-01T09:06:03.000Z], Person [businessDay=01042020, departmentId=101, endTime=2020-04-01T09:48:25.000Z, companyId=242, startTime=2020-04-01T09:07:48.000Z]]
PCollection类似于PCollection
我已经将 Person 实现为可序列化的 POJO 类,并覆盖等于和哈希方法。但我认为我也需要为人编写自定义 ListCoder 并在管道中注册。 我不确定如何解决此问题,请帮助。
这是一个工作示例。 如果克隆存储库,请在playground
根目录下运行./gradlew run
,然后可以验证效果。您还可以使用./gradlew run --args='--runner=DataflowRunner --project=$YOUR_PROJECT_ID --tempLocation=gs://xxx/staging --stagingLocation=gs://xxx/staging'
运行以在数据流上运行它。
如果从头开始构建Person
类,它应如下所示:
class Person implements Serializable {
public Person(
String businessDay,
String departmentId,
String companyId
) {
this.businessDay = businessDay;
this.departmentId = departmentId;
this.companyId = companyId;
}
public String companyId() {
return companyId;
}
public String businessDay() {
return businessDay;
}
public String departmentId() {
return departmentId;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
return false;
}
Person otherPerson = (Person) other;
return this.businessDay.equals(otherPerson.businessDay)
&& this.departmentId.equals(otherPerson.departmentId)
&& this.companyId.equals(otherPerson.companyId);
}
@Override
public int hashCode(){
return Objects.hash(this.businessDay, this.departmentId, this.companyId);
}
private final String businessDay;
private final String departmentId;
private final String companyId;
}
我推荐
使用自动值而不是从头开始创建 POJO。以下是一些示例。您可以在此处查看整个项目。优点是不必在每次创建新对象类型时都从头开始实现
equals
和hashCode
。在 KV 中,如果键是可迭代对象(如 List(,请将其包装在一个对象中并显式确定地序列化它(示例(,因为 Java 中的序列化是低级的。