FLINK : java.io.IOException: Insufficient number of network



我正在尝试使用flink对多个数据流进行数据充实。

这里我有一些数据在account_stream和status_stream。我想将这些数据添加到来自多个不同来源的所有其他流中。所有流的数据中都有一个共同的字段:"account_id"

这是我采取的方法。

account_stream.connect(status_stream)
.flat_map(EnrichmentFunction())
.filter(lambda x: x['name'] != "-" and x['date'] != "0000-00-00 00:00:00")
.key_by(lambda row: row['account_id'])
.connect(stream1)
.flat_map(function_2())
.filter(lambda x: x!="2")
.key_by(lambda row: row['account_id'])
.connect(stream2)
.flat_map(function_2())
.key_by(lambda row: row['account_id'])
.connect(stream3)
.flat_map(function_3())
.key_by(lambda row: row['account_id'])
.connect(stream4)
.flat_map(function_4())
.key_by(lambda row: row['account_id'])
.connect(stream5)
.flat_map(function_5())
.key_by(lambda row: row['account_id'])
.connect(stream6)
.flat_map(function_6())
.key_by(lambda row: row['account_id'])
.connect(stream7)
.flat_map(function_7())
.key_by(lambda row: row['account_id'])
.connect(stream_8)
.flat_map(function_8())
.map(lambda a: str(a),Types.STRING())
.add_sink(kafka_producer)

我在状态中保存必要的数据,并使用flat_map函数将其附加到所有流。最后添加一个kafka sink来发送所有富含state的流。

现在一旦我执行这个,我得到这个错误:">. io .IOException:网络缓冲区数量不足:需要17个,但只有8个可用。当前网络缓冲区的总数被设置为2048,每个缓冲区32768字节。"

我尝试将taskmanager.memory.network.fraction更改为0.5taskmanager.memory.network.max设置为15gbtaskmanager.memory.process。在flink配置文件中。但误差还是一样的。除了保存它之外,我还需要做些什么来查看flink job中反映的更改吗?还是有别的问题?

也让我知道如果这种方法是无效的任务,如果有其他的东西我应该尝试?

我使用单个32gb ram, 8核服务器在python中运行这个pyflink库,kafka和elastic在同一台服务器上运行。

谢谢。

配置TaskManager的网络内存,请参考官方文档中的"设置TaskManager内存"页面。有几件事需要注意:

  1. taskmanager.memory.network.fraction的总flink内存用作网络内存。如果导出的大小小于/大于配置的最小/最大大小,则使用最小/最大大小。
  2. 网络内存大小不能超过进程总内存大小。
  3. 你可以在TaskManager的日志开始处找到当前网络内存的最大/最小值。检查你的配置是否有效。

如果你可以升级Flink到1.14,你可以尝试最新的功能:细粒度资源管理。有了这个功能,网络内存将自动配置为每个TaskManager所需的数量。但是,要使用此特性,您需要为每个操作符设置SlotSharingGroups,并为它们配置CPU和内存资源。欲知详情,请参考官方文档。

相关内容

最新更新