Flink-具有高并行性的网络缓冲区数量不足



我在尝试旋转我的管道时遇到了错误,不知道是否有办法解决这个问题,而不必大量使用配置:

java.io.IOException: Insufficient number of network buffers: required 1, but only 0 available. The total number of network buffers is currently set to 32768 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:268)
at org.apache.flink.runtime.io.network.NetworkEnvironment.setupPartition(NetworkEnvironment.java:212)
at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:193)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:749)

正如错误所建议的,我已经尝试同时提升taskmanager.network.memory.fract(0.1->0.7(和taskmanager.network.memory.max(1GB->4GB(,这对于我当前的配置来说应该绰绰有余。

我目前的配置是:

  • 10个实例/任务管理器
  • 每个任务管理器25个并行/任务槽
  • 总共250个任务槽
  • 任务管理器堆大小-44 GB

以下https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#configuring-网络缓冲区的计算应该是:

#slots-per-TM^2 * #TMs * 4 * 32000 kb/buffer -> 25^2 * 10 * 4 * 32000 = 800 mb roughly

在默认的最大网络内存配置(最大1 GB(下,以200并行度旋转管道可以很好地工作,但在超过210的情况下会遇到问题——我很难弄清楚为什么在这种情况下4 GB仍然不够。

任何提示都会很棒

感谢

编辑(添加管道草图(:

final SingleOutputStreamOperator<MyObject> stream = eventDataStream
.rebalance() // evenly distribute the IO heavy augmenter work to all available workers.
// Filter bad events
.filter(new BadEventFilter())
.name("BadEventFilter")
.map(myMapper1)
.name("myMapper1")
.disableChaining()
.flatMap(myFlatMapper)
.name("myFlatMapper")
.disableChaining()
.map(myMapper2)
.name("myMapper2")
.startNewChain()
.keyBy(mySelector)
//Use the keyed events to create appropriate grouping
.process(groupingProcessing)
.name("groupingProcessing")
.rebalance()
.flatMap(myGroupingFlatMapper)
.name("myGroupingFlatMapper")
.startNewChain()
.map(myMapper3)
.name("myMapper3")
.process(sideArchive)
.name("sideArchive");
stream.getSideOutput("myOutputTag")
.addSink(sink)
.name("archive");
stream.addSink(sink)
.name("sink");

问题是您有太多的shuffle,因此需要大量的网络内存缓冲区。

公式

#slots-per-TM^2 * #TMs * 4

是一个洗牌步骤,如果我没有错的话,你有7或8(2个重新平衡,2个新链启动,2个禁用链,2个下沉(。在这种情况下,甚至粗略估计是25^2 * 10 * 4 * 8 = 200,000网络缓冲区,在您的情况下,您的网络缓冲区比32,000多一点。

简单地增加到4Gb没有帮助的原因可能是没有达到4Gb的最大内存限制——实际数量计算为flink管理内存的taskmanager.network.memory.fraction

老实说,我看不出有任何理由禁用链接并启动新的链-这只会引入更多的洗牌,所以我建议不要调整网络设置,而是取消新的链创建并简化一点管道:

final SingleOutputStreamOperator<MyObject> stream = eventDataStream
// Filter bad events
// filter before rebalance to reduce the network traffic.
.filter(new BadEventFilter())
.name("BadEventFilter")
.rebalance() // evenly distribute the IO heavy augmenter work to all available workers.
.map(myMapper1)
.name("myMapper1")
//.disableChaining()
.flatMap(myFlatMapper)
.name("myFlatMapper")
//.disableChaining()
.map(myMapper2)
.name("myMapper2")
//.startNewChain()
.keyBy(mySelector)
//Use the keyed events to create appropriate grouping
.process(groupingProcessing)
.name("groupingProcessing")
//.rebalance()
.flatMap(myGroupingFlatMapper)
.name("myGroupingFlatMapper")
//.startNewChain()
.map(myMapper3)
.name("myMapper3")
.process(sideArchive)
.name("sideArchive");
stream.getSideOutput("myOutputTag")
.addSink(sink)
.name("archive");
stream.addSink(sink)
.name("sink");

最新更新