我有两个DataStreams
,第一个称为DataStream<String> source
,它从消息代理接收记录,第二个是SingleOutputOperator<Event> events
,它是将源映射到Event.class
的结果。
我有一个用例需要使用SingleOutputOperator<Event> events
,另一个用例使用DataStream<String> source
。在使用DataStream<String> source
的一个用例中,我需要在应用一些过滤器后加入SingleOutputOperator<String> result
,并避免再次将source
映射到Event.class
,因为我已经完成了该操作和Stream
,我需要将SingleOutputOperator<String> result
中的每个记录搜索到SingleOutputOperator<Event> events
中,并应用另一个映射来导出SingleOutputOperator<EventOutDto> out
。
这是一个例子:
DataStream<String> source = env.readFrom(source);
SingleOutputOperator<Event> events = source.map(s -> mapper.readValue(s, Event.class));
public void filterAndJoin(DataStream<String> source, SingleOutputOperator<Event> events){
SingleOutputOperator<String> filtered = source.filter(s -> new FilterFunction());
SingleOutputOperator<EventOutDto> result = (this will be the result of search each record
based on id in the filtered stream into the events stream where the id must match and return the event if found)
.map(event -> new EventOutDto(event)).addSink(new RichSinkFunction());
}
我有这个代码:
filtered.join(events)
.where(k -> {
JsonNode tree = mapper.readTree(k);
String id = "";
if (tree.get("Id") != null) {
id = tree.get("Id").asText();
}
return id;
})
.equalTo(e -> {
return e.Id;
})
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.apply(new JoinFunction<String, Event, BehSingleEventTriggerDTO>() {
@Override
public EventOutDto join(String s, Event event) throws Exception {
return new EventOutDto(event);
}
})
.addSink(new SinkFunction());
在上面的代码中,所有的工作都很好,ids
是一样的,所以基本上where(id).equalTo(id)
应该工作,但这个过程从来没有达到apply
的功能。
观察:Watermark
被分配了相同的时间戳
问题:
- 知道为什么吗
- 我解释得很好吗
我通过以下操作解决了连接:
SingleOutputStreamOperator<ObjectDTO> triggers = candidates
.keyBy(new KeySelector())
.intervalJoin(keyedStream.keyBy(e -> e.Id))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process(new new ProcessFunctionOne())
.keyBy(k -> k.otherId)
.process(new ProcessFunctionTwo());