如何从不同的筛选器中恢复KeyedStream,这些筛选器是在之前键入的之后应用的



在不需要在过滤结束时创建新的keyedStream的情况下,如何分散相同的keyedStream并根据不同的用例应用过滤器?示例:

DataStream<Event> streamFiltered = RabbitMQConnector.eventStreamObject(env)
.flatMap(new Consumer())
.name("Event Mapper")
.assignTimestampsAndWatermarks(new PeriodicExtractor())
.name("Watermarks Added")
.filter(new NullIdEventsFilterFunction())
.name("Event Filter");
/*now I will or need to send the same keyedStream for applying two different transformations with different filters but under the same keyed concept*/
/*Once I'd applied the filter I will receive back a SingleOutputStreamOperator and then I need to keyBy again*/
/*in a normal scenario I will need to do keyBy again, and I want to avoid that */
KeyedStream<T,T> keyed1 = streamFiltered.filter(x -> x.id != null).keyBy(key -> key.id); /*wants to avoid this*/
KeyedStream<T,T> keyed2= streamFiltered.filter(x -> x.id.lenght > 10).keyBy(key -> key.id);/*wants to avoid this*/
seeProduct(keyed1);
checkProduct(keyed2);
/*these are just an example, this two operations receive a keyedStream under the same concept but with different filters applied to the keyedStream already created and wants to reuse that same keyedStream after different filters to avoid a new creation*/
private static SingleOutputStreamOperator<EventProduct>seeProduct(KeyedStream<Event, String> stream) {
return stream.map(x -> new EventProduct(x)).name("Event Product");
}
private static SingleOutputStreamOperator<EventCheck>checkProduct(KeyedStream<Event, String> stream) {
return stream.map(x -> new EventCheck(x)).name("Event Check");
}

在正常情况下,每个过滤器函数都会返回一个SingleOutputStream,然后我需要再次执行keyBy(但我已经有了一个keyedStream by id,这就是我的想法,要在过滤器之后获得它,我需要再次进行keyBy并创建一个新的keyedStream(。例如,在应用过滤器后,有什么方法可以保持keyedStream概念?

我认为,在您的情况下,side output功能会有所帮助——对于每个过滤器场景,您可以从基本keyed stream获得单独的侧输出。

请参阅flink方面输出文档中的更多详细信息和示例:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html.

像这样的东西(伪代码(应该对你有用:

final OutputTag<Tuple2<String, Event>> outputTag1 = new OutputTag<>("side-output-filter-1"){};
final OutputTag<Tuple2<String, Event>> outputTag2 = new OutputTag<>("side-output-filter-2"){};
DataStream<Event> keyedStream = source.keyby(x -> x.id);
.process(new KeyedProcessFunction<Tuple, Tuple2<String, Event>, Tuple2<String, Event>> {
@Override
public void processElement(
Tuple2<String, Event> value,
Context ctx,
Collector<Tuple2<String, Event>> out) throws Exception {
// emit data to regular output
out.collect(value);
// emit data to side output
ctx.output(outputTag1, value);
ctx.output(outputTag2, value);
}
})
/*for use case one I need to use the same keyed concept but apply a filter*/
DataStream<Tuple2<String, Event>> sideOutputStream1 = keyedStream.getSideOutput(outputTag1).filter(x -> x.id != null);
/*for use case two I need to use the same keyed concept but apply a filter*/
DataStream<Tuple2<String, Event>> sideOutputStream2 = keyedStream.getSideOutput(outputTag2).filter(x -> x.id.lenght > 10);

似乎最简单的答案是首先应用过滤,然后使用keyBy。

如果出于某种原因,您需要在筛选之前对流进行键分区(例如,您可能正在应用一个使用键分区状态的RichFilterFunction(,那么您可以使用reinterpretAsKeyedStream重新建立键,而不需要另一个keyBy。

使用副输出是将流拆分为几个经过过滤的子流的好方法,但这些输出流不会是KeyedStreams。只有在重新应用键选择器函数会产生与现有分区完全相同的分区的情况下,才能安全地使用reinterpretAsKeyedStream。

相关内容

  • 没有找到相关文章

最新更新