在Flink中,当我们有两个或多个操作符同时输出相同数据类型的记录时,我们可以重用数据输出数据类型的OutputTag吗?
示例:
OutputTag<A> sideOutputTag = new OutputTag<A>("side-output") {};
ProcessFunction1 processFunction1 = new ProcessFunction1(sideOutputTag);
ProcessFunction2 processFunction2 = new ProcessFunction2(sideOutputTag);
SingleOutputStreamOperator<A> output1 = input.process(processFunction1).getSideOutput(sideOutputTag);
SingleOutputStreamOperator<A> output2 = input.process(processFunction2).getSideOutput(sideOutputTag);
在这种方法中,output1
是否包含由processFunction2
处理的输出?或者,output1
和output2
是否分别包含processFunction1
和processFunction2
处理的记录?
谢谢!
您可以重用同一个标记,结果流将是不同的。例如:
final OutputTag<String> errors = new OutputTag<String>("errors"){};
SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);
DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));