如何在纯 java 中设置 flink 并行性 (IDEA)



我正在使用以下 scala 代码来运行我的 flink 流作业

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
mystream.addSource(new mySource(params))
.map(new myMap(params))
.addSink(new mySink(params)).setParallelism(1)
mystream.setParallelism(1)
mystream.execute("My Streaming")

当我使用flink run -p 1时,并行度为1(不知道是-p有效还是代码有效(。 当我使用纯Java运行时(在IDEA中,我想它是在纯Java中运行的(,并行度通常是5,这表明我的代码不起作用。如何控制它?


正如最高答案所建议的那样,以下代码也不起作用,仍然具有 5 的平行论。

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
mystream.addSource(new mySource(params))
.map(new myMap(params))
.addSink(new mySink(params))
mystream.setParallelism(1)
mystream.execute("My Streaming")

在环境中设置默认并行度。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(...)

使用.addSink(new mySink(params)).setParallelism(1)将覆盖特定运算符的默认并行性。

相关内容

  • 没有找到相关文章

最新更新