我是 Flink 的新手,目前正在将 Spark 中的一些代码示例转换为 Flink。Flink 中用于JavaSparkContext
中并行化方法的类似函数是什么?我尝试转换以下代码:
JavaRDD<Integer> workload = ctx.parallelize(Arrays.asList(init_val), parallel).map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer s) throws InterruptedException {
Thread.sleep(s * 1000);
return 0;
}
});
Flink 相当于 JavaSparkContext.parallelize()
是 ExecutionEnvironment.fromCollection()
。
因此,您的代码片段应转换为以下内容:
// get execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create data set from collection
DataSet<Integer> input = env.fromCollection(Arrays.asList(init_val));
// apply map function
DataSet<Integer> result = input.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer s) {
Thread.sleep(s * 1000);
return 0;
}
}).setParallelism(parallel); // set parallelism of map function
您将使用 ExecutionEnvironment
提供的 fromCollection
方法。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> input = env.fromCollection(inputList);