是否可以将flink数据流的输出捕获到列表中



我刚开始退缩,不知道这是正确的方法还是愚蠢的事情我有一个字符串数据类型的数据流,我正试图将数据流中的数据捕获到列表中,我正在尝试类似下面的方法

public class DataCapture {
public static List<String> stringList(DataStream<String> dataStream) {
List<String> myOutputlist = new ArrayList<>();
dataStream.flatMap(new FlatMapFunction<String, List<String>>() {
@Override
public void flatMap(String value, Collector<List<String>> out) throws Exception {
System.out.println("==================DATASTREAM-VALUE=====================" +value);
myOutputlist.add(value);
out.collect(myOutputlist);
}
});
return myOutputlist;
}
}

有没有办法我可以把它放进一个列表中,我甚至尝试添加一个接收器,并尝试将输出捕获到同样不起作用的列表事件中

不确定生产代码,但在几个测试用例中,我在List周围使用了CollectSink包装器,类似于这个:

// a testing sink
class CollectSink implements SinkFunction<String> {
// must be static
public static final List<String> values = new CopyOnWriteArrayList<>();
@Override
public synchronized void invoke(String value) throws Exception {
values.add(value);
}
}

此接收器将收集value列表中的元素。您只需要将此接收器添加到管道中。

更新:正如@kkrugler所指出的,用线程安全的CopyOnWriteArrayListList实现替换了ArrayList,以便能够安全地使用这个并行度大于1的接收器。

相关内容

  • 没有找到相关文章

最新更新