Apache Flink在特定密钥上加入不同的数据流



我有两个DataStreams,第一个称为DataStream<String> source,它从消息代理接收记录,第二个是SingleOutputOperator<Event> events,它是将源映射到Event.class的结果。

我有一个用例需要使用SingleOutputOperator<Event> events,另一个用例使用DataStream<String> source。在使用DataStream<String> source的一个用例中,我需要在应用一些过滤器后加入SingleOutputOperator<String> result,并避免再次将source映射到Event.class,因为我已经完成了该操作和Stream,我需要将SingleOutputOperator<String> result中的每个记录搜索到SingleOutputOperator<Event> events中,并应用另一个映射来导出SingleOutputOperator<EventOutDto> out

这是一个例子:

DataStream<String> source = env.readFrom(source);
SingleOutputOperator<Event> events = source.map(s -> mapper.readValue(s, Event.class));

public void filterAndJoin(DataStream<String> source, SingleOutputOperator<Event> events){

SingleOutputOperator<String> filtered = source.filter(s -> new FilterFunction());

SingleOutputOperator<EventOutDto> result = (this will be the result of search each record 
based on id in the filtered stream into the events stream where the id must match and return the event if found)
.map(event -> new EventOutDto(event)).addSink(new RichSinkFunction());
}

我有这个代码:

filtered.join(events)
.where(k -> {
JsonNode tree = mapper.readTree(k);
String id = "";
if (tree.get("Id") != null) {
id = tree.get("Id").asText();
}
return id;
})
.equalTo(e -> {
return e.Id;
})
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.apply(new JoinFunction<String, Event, BehSingleEventTriggerDTO>() {
@Override
public EventOutDto join(String s, Event event) throws Exception {
return new EventOutDto(event);
}
})
.addSink(new SinkFunction());

在上面的代码中,所有的工作都很好,ids是一样的,所以基本上where(id).equalTo(id)应该工作,但这个过程从来没有达到apply的功能。

观察:Watermark被分配了相同的时间戳

问题:

  • 知道为什么吗
  • 我解释得很好吗

我通过以下操作解决了连接:

SingleOutputStreamOperator<ObjectDTO> triggers = candidates
.keyBy(new KeySelector())
.intervalJoin(keyedStream.keyBy(e -> e.Id))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process(new new ProcessFunctionOne())
.keyBy(k -> k.otherId)
.process(new ProcessFunctionTwo());

相关内容

  • 没有找到相关文章

最新更新