Apache Flink 如何映射和匹配具有主键的备用键到一个键流



我想要一种更简单、更好、更优雅的方法来解决下面的问题。我还没有遇到任何关于这个主题的文档,我相信我目前的方法有一些瓶颈,谢谢

我有一个流,Json被映射到POJO

DataStream<MYPOJO> stream = env.
addSource( <<kafkaSource>>).map(new EventToPOJO());

POJO的一些字段将有一个填充的主键,有些字段将有填充的备用键。有些字段将两者都有。我在Flink文档中找到的使用两个键的唯一例子是,对复合键使用keyselector,但对备用键不使用keyselelector

我目前的方法如下:

  1. 使用richFlatMapFunction将主键的所有元素收集到流中,Astream
  2. 使用richFlatMapFunction将备用密钥的所有元素收集到流中,BStream
  3. USe richFlatMap用于同时具有主键和备用键的项,CStream
  4. 使用主键上的Cstream加入Astream
  5. 在备用密钥上加入Bstream和Cstream
  6. finally KeyBy主键

DataStream<MyPOJO> primaryKey = stream.flatMap(new RichFlatMapFunction<MyPOJO mypojo, MyPOJO mypojo>() {
@Override
public void flatMap(MyPOJO mypojo, Collector<MyPOJO> collector) throws Exception {
if(mypojo.PrimaryKey() != null){

collector.collect(MyPOJO);
}
}
});

DataStream<MyPOJO> alternateKey = stream.flatMap(new RichFlatMapFunction<MyPOJO mypojo, MyPOJO mypojo>() {
@Override
public void flatMap(MyPOJO mypojo, Collector<MyPOJO> collector) throws Exception {
if(mypojo.getAlternateKey() != null){

collector.collect(mypojo);
}
}
});

DataStream<MyPOJO> both = stream.flatMap(new RichFlatMapFunction<MyPOJO mypojo, MyPOJO mypojo>() {
@Override
public void flatMap(MyPOJO mypojo, Collector<MYPOJO> collector) throws Exception {
if(mypojo.getAlternateKey() != null && mypojo.getPrimaryKey() !=null ){

collector.collect(mypojo);
}
}
});

//Join them 
both.join(alternateKey)
.where(MyPOJO::getAlternateKey)
.equalTo(MyPOJO::getAlternateKey)
.window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
.apply (new JoinFunction<MyPOJO, MyPOJO, MyPOJO>(){

@Override
public StateObject join(MyPOJO Mypojo, MyPOJO mypojo2) throws Exception {
// Some Join logic to keep both states 
return stateObject2;
}
});
:: repeat for primary key stream ...

// keyby at the end
both.keyBy(MyPOJO::getPrimaryKey)

我相信我也可以使用过滤函数来实现这3个流,但我不想一开始就分裂成3个流。为了可读性,我简化了上面的内容,所以请不要介意你可能发现的任何语法错误。

您应该实现一个自定义POJO,该POJO包含主&副钥匙。它需要有equals()hashCode()方法,它们实现两条记录相等时所需的逻辑(*(。看见hashCode((和equals((方法,了解为什么要这样做的更多细节。

添加一个返回此自定义POJO的MyPOJO.getJoiningKey()

然后只需基于.where(r -> r.getJoiningKey()).equals(r -> r.getJoiningKey())执行单个联接。

(*(我仍然不确定你希望你的逻辑是什么;副密钥不为空,右侧主键为空,但副密钥不是空,您想比较什么?

最新更新