Spark to Flink 并行化方法



我是 Flink 的新手,目前正在将 Spark 中的一些代码示例转换为 Flink。Flink 中用于JavaSparkContext中并行化方法的类似函数是什么?我尝试转换以下代码:

JavaRDD<Integer> workload = ctx.parallelize(Arrays.asList(init_val), parallel).map(new Function<Integer, Integer>() {
      @Override
      public Integer call(Integer s) throws InterruptedException {
        Thread.sleep(s * 1000);
        return 0;
      }
    });

Flink 相当于 JavaSparkContext.parallelize()ExecutionEnvironment.fromCollection()

因此,您的代码片段应转换为以下内容:

// get execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create data set from collection
DataSet<Integer> input = env.fromCollection(Arrays.asList(init_val));
// apply map function
DataSet<Integer> result = input.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer map(Integer s) {
    Thread.sleep(s * 1000);
    return 0;
  }
}).setParallelism(parallel); // set parallelism of map function

您将使用 ExecutionEnvironment 提供的 fromCollection 方法。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> input = env.fromCollection(inputList);

相关内容

  • 没有找到相关文章

最新更新