我刚开始退缩,不知道这是正确的方法还是愚蠢的事情我有一个字符串数据类型的数据流,我正试图将数据流中的数据捕获到列表中,我正在尝试类似下面的方法
public class DataCapture {
public static List<String> stringList(DataStream<String> dataStream) {
List<String> myOutputlist = new ArrayList<>();
dataStream.flatMap(new FlatMapFunction<String, List<String>>() {
@Override
public void flatMap(String value, Collector<List<String>> out) throws Exception {
System.out.println("==================DATASTREAM-VALUE=====================" +value);
myOutputlist.add(value);
out.collect(myOutputlist);
}
});
return myOutputlist;
}
}
有没有办法我可以把它放进一个列表中,我甚至尝试添加一个接收器,并尝试将输出捕获到同样不起作用的列表事件中
不确定生产代码,但在几个测试用例中,我在List
周围使用了CollectSink
包装器,类似于这个:
// a testing sink
class CollectSink implements SinkFunction<String> {
// must be static
public static final List<String> values = new CopyOnWriteArrayList<>();
@Override
public synchronized void invoke(String value) throws Exception {
values.add(value);
}
}
此接收器将收集value
列表中的元素。您只需要将此接收器添加到管道中。
更新:正如@kkrugler所指出的,用线程安全的CopyOnWriteArrayList
List
实现替换了ArrayList
,以便能够安全地使用这个并行度大于1的接收器。