在具有不同路径的单个接收器中发射"Side Outputs"和"process output"



如何使用单个接收器发射"侧输出"one_answers"进程输出"。这里,在这种情况下,两个输出都需要发射到单个接收器,并且基于标签文件夹路径不同

例如

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};    
SingleOutputStreamOperator<String> mainDataStream = source.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) {
try {
builder.parse(new InputSource(new StringReader(value)));
out.collect(value);
} catch (SAXException | IOException e) {
ctx.output(outputTag, value);
}
}
});
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);

还有其他更好的解决方案吗?只是担心的性能

如果您想使用单个接收器,可以在输出格式中添加一个属性,并使用该属性来标识单个接收器中的数据源。

您还可以构造两个具有不同参数的接收器,以接收来自不同源的数据。在我看来,在不考虑您使用的数据库的情况下,这种多线程方式具有更好的性能。

Flink的BucketingSink可以使用Bucketer来确定将使用基本目录中的哪个子目录。因此,您可以使用它根据正在写入的记录中的属性设置子目录。

就使用单个接收器而言,由于函数的主输出和副输出都是String对象(相同类型(,因此可以在输出结果之前将两个流mainDataStream.union(sideOutputStream)在一起。

相关内容

  • 没有找到相关文章

最新更新