我在apache flink项目中有以下情况。
3个带有不同对象的流,例如
人 ->字符串ID,字符串firstName,字符串lastname(即101,John,Doe)
PersonDetail ->字符串ID,字符串地址,字符串城市,字符串Phonenumber,长personid(即99,Stefansplatz 1, 43066012345678,101)
>premodaddetail ->字符串ID,字符串addDetailType,object adddetailvalue,long persyid(即77、1,Hansi或78、2、1234或80、30、3,true)
我想汇总(不确定这是此处的正确措辞),从这些流到我将新对象放到新流中的对象。汇总应基于人ID,作为其他捕获,我只需要使用特定的AddDetailType过滤personddetail(假设我只对具有1型和2型的对象感兴趣)。
聚合的对象应该以某种方式看起来像
perseReport ->长ID,字符串firstName,字符串lastname,字符串地址,字符串城市,字符串phonenumber,arrayList<personaddetail>详细信息
现在的问题是,这是否是可能的,如果是,我该如何完成。欢迎每个输入。
您的问题听起来像join
操作。您可以做类似的事情:
personDataStream.join(personDetailDataStream).where(new KeySelector<Person, Long>() {
...
}).equalTo(new KeySelector<PersonDetail, Long>() {
...
}).window(TumblingEventTimeWindows.of(Time.seconds(2))).apply(new JoinFunction<Person, PersonDetail, PersonWithDetail>() {
...
});
请注意,在无界(无限)集合中,一般的联接操作是不可能的,因此您需要将其绑定到Windows中。
感谢@jeremy的大评论,我自己想出了一个解决方案,我想分享我的想法和代码。我介绍了一个名为PersonContainer的新课程
public class PersonContainer {
private String id;
private Person person;
private PersonDetail personDetail;
private List<PersonAddDetail> personAddDetailList = new ArrayList<>();
public PersonContainer(Person person) {
this.id = person.getID();
this.person = person;
}
public PersonContainer(PersonDetail personDetail) {
this.id = personDetail.getOTTRID();
this.personDetail = personDetail;
}
public PersonContainer(PersonAddDetail personAddDetail) {
this.id = personAddDetail.getOTTRID();
this.timeStamp = ttrDetailAddEvent.getDATECREATED();
this.personAddDetailList.add(personAddDetail);
}
public PersonContainer merge(PersonContainer other) {
if (other.person != null) {
this.person = other.person;
return this;
}
if (other.personDetail != null) {
this.personDetail = other.personDetail;
return this;
}
if (other.personAddDetailList.size() > 0) {
this.personAddDetailList.addAll(other.personAddDetailList);
return this;
}
return null;
}
public String getId() {
return id;
}
public Person getPerson() {
return person;
}
public PersonDetail getPersonDetail() {
return personDetail;
}
public List<PersonAddDetail> getPersonAddDetailList() {
return PersonAddDetailList;
}
public boolean isComplete() {
return person != null && personDetail != null && personAddDetailList.size() > 1;
}
}
这是重要的部分
所以这是我所做的,我描述了评论中的单个步骤。简而言之,我将三个输入流映射到新引入的容器的新流。然后,我在三个流上进行联合,并使用迭代模式来键入这些对象,并使用我的自定义合并方法将它们合并。最后,我将一种自定义完整方法定义为不同的完全合并的容器,该容器最终映射到输出,尚未完成,这些容器尚未馈入合并过程。
//Filter PersonAddDetail to have just the types needed
DataStream<PersonContainer> filteredPersonAddDetail = unfilteredPersonAddDetail.filter(new FilterFunction<OboTtrDetailAddEvent>() {
@Override
public boolean filter(PersonAddDetail personAddDetail) throws Exception {
return personAddDetail.getAddDetailType().matches("1|2");
}
});
//map Person stream to common object
DataStream<PersonContainer> mappedPersonStream = personInputStream.map(new MapFunction<Person, PersonContainer>() {
@Override
public PersonContainer map(Person Person) throws Exception {
return new PersonContainer(Person);
}
});
//map PersonDetail stream to common object
DataStream<PersonContainer> mappedPersonDetailStream = personDetailInputStream.map(new MapFunction<PersonDetail, PersonContainer>() {
@Override
public PersonContainer map(PersonDetail PersonDetail) throws Exception {
return new PersonContainer(PersonDetail);
}
});
//map PersonAddDetail stream to common object
DataStream<PersonContainer> mappedPersonAddDetailStream = filteredPersonAddDetail.map(new MapFunction<PersonAddDetail, PersonContainer>() {
@Override
public PersonContainer map(PersonAddDetail PersonAddDetail) throws Exception {
return new PersonContainer(PersonAddDetail);
}
});
//union the three input streams to one single stream
DataStream<PersonContainer> combinedInput = mappedPersonStream.union(mappedPersonDetailStream, mappedPersonAddDetailStream);
// Iteration pattern is in place here and I'm going to recursively try to merge corresponding objects togehter
IterativeStream<PersonContainer> iteration = combinedInput.iterate();
// Group objects by there shared ID and then use reduce to merge them
DataStream<PersonContainer> iterationBody = iteration.keyBy(new KeySelector<PersonContainer, String>() {
@Override
public String getKey(PersonContainer personContainer) throws Exception {
return personContainer.getId();
}
})
.reduce(new ReduceFunction<PersonContainer>() {
@Override
public PersonContainer reduce(PersonContainer personContainer, PersonContainer other) throws Exception {
return personContainer.merge(other);
}
});
// use the containers complete method to check whether the merge is finished or we need to wait for further objects in the stream
DataStream<PersonContainer> containersNotCompleteYet = iterationBody.filter(new FilterFunction<PersonContainer>() {
@Override
public boolean filter(PersonContainer PersonContainer) throws Exception {
return !personContainer.isComplete();
}
});
// partially merged or not merged at all containers are put back on the stream
iteration.closeWith(containersNotCompleteYet);
// fully merged containers are processed further
DataStream<PersonContainer> completeContainers = iterationBody.filter(new FilterFunction<PersonContainer>() {
@Override
public boolean filter(PersonContainer PersonContainer) throws Exception {
return personContainer.isComplete();
}
});
// finally the container is mapped to the correct output object
DataStream<PersonReport> personReport = completeContainers.map(new MapFunction<PersonContainer, PersonReport>() {
@Override
public PersonReport map(PersonContainer personContainer) throws Exception {
// map personContainer to final PersonReport
return personContainer;
}
});
这种方法对我有用,好事是我可以处理流在流上迟到的对象(例如,在其他对象之后的几分钟内,persotaddetail出现了),我不需要定义某种窗口。感谢您的输入