我正在尝试使用flink对多个数据流进行数据充实。
这里我有一些数据在account_stream和status_stream。我想将这些数据添加到来自多个不同来源的所有其他流中。所有流的数据中都有一个共同的字段:"account_id"
这是我采取的方法。
account_stream.connect(status_stream)
.flat_map(EnrichmentFunction())
.filter(lambda x: x['name'] != "-" and x['date'] != "0000-00-00 00:00:00")
.key_by(lambda row: row['account_id'])
.connect(stream1)
.flat_map(function_2())
.filter(lambda x: x!="2")
.key_by(lambda row: row['account_id'])
.connect(stream2)
.flat_map(function_2())
.key_by(lambda row: row['account_id'])
.connect(stream3)
.flat_map(function_3())
.key_by(lambda row: row['account_id'])
.connect(stream4)
.flat_map(function_4())
.key_by(lambda row: row['account_id'])
.connect(stream5)
.flat_map(function_5())
.key_by(lambda row: row['account_id'])
.connect(stream6)
.flat_map(function_6())
.key_by(lambda row: row['account_id'])
.connect(stream7)
.flat_map(function_7())
.key_by(lambda row: row['account_id'])
.connect(stream_8)
.flat_map(function_8())
.map(lambda a: str(a),Types.STRING())
.add_sink(kafka_producer)
我在状态中保存必要的数据,并使用flat_map函数将其附加到所有流。最后添加一个kafka sink来发送所有富含state的流。
现在一旦我执行这个,我得到这个错误:">. io .IOException:网络缓冲区数量不足:需要17个,但只有8个可用。当前网络缓冲区的总数被设置为2048,每个缓冲区32768字节。"
我尝试将taskmanager.memory.network.fraction更改为0.5,taskmanager.memory.network.max设置为15gb和taskmanager.memory.process。在flink配置文件中。但误差还是一样的。除了保存它之外,我还需要做些什么来查看flink job中反映的更改吗?还是有别的问题?
也让我知道如果这种方法是无效的任务,如果有其他的东西我应该尝试?
我使用单个32gb ram, 8核服务器在python中运行这个pyflink库,kafka和elastic在同一台服务器上运行。
谢谢。
配置TaskManager的网络内存,请参考官方文档中的"设置TaskManager内存"页面。有几件事需要注意:
taskmanager.memory.network.fraction
的总flink内存用作网络内存。如果导出的大小小于/大于配置的最小/最大大小,则使用最小/最大大小。- 网络内存大小不能超过进程总内存大小。
- 你可以在TaskManager的日志开始处找到当前网络内存的最大/最小值。检查你的配置是否有效。
如果你可以升级Flink到1.14,你可以尝试最新的功能:细粒度资源管理。有了这个功能,网络内存将自动配置为每个TaskManager所需的数量。但是,要使用此特性,您需要为每个操作符设置SlotSharingGroups,并为它们配置CPU和内存资源。欲知详情,请参考官方文档。