我的Flink工作经常与一个或另一个任务经理进行OOM。我有足够的内存和存储空间来完成我的工作(2个JobManager/16个TaskManagers,每个都有15core和63GB RAM(。有时作业在抛出OOM之前运行4天,有时作业在2天内进入OOM。但与前几天相比,交通量是稳定的。
我收到了一个建议,不要在流管道中传递对象,而是使用原语来减少混洗开销和内存消耗。
我所做的那个棘手的工作是用Java编写的。让我们说下面是我的管道
Kafka source
deserialize (converted bytes to java object, the object contains String, int, long types)
FirstKeyedWindow (the above serialized java objects received here)
reduce
SecondKeyedWindow (the above reduced java objects received here)
reduce
Kafka sink (above java objects are serialized into bytes and are produced to kafka)
我的问题是,为了减少开销和内存消耗,我应该考虑什么?用char数组替换String是否有助于减少开销?或我应该只处理整个管道中的字节吗?如果我在KeyedWindows之间序列化对象,这有助于减少开销吗?但是如果我必须读回字节,那么我需要反序列化,根据需要使用,然后序列化它。这不会造成更多的序列化/反序列化开销吗?
感谢你的建议。Headsup,我说的是每天收到10TB的数据。
更新1:
OOM的例外情况如下:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:远程任务管理器"host/host:port"意外关闭了连接。这可能表示远程任务管理器丢失。
回答David Anderson的以下评论:使用的Flink版本是v1.11。使用的状态后端是基于文件系统的RocksDB。作业的堆内存不足。来自Kafka源的每条消息的大小最大可达300Bytes。reduce函数执行重复数据消除(删除同一组中的重复项(,第二个reduce功能执行聚合(更新对象内的计数(。
更新2:
经过深入研究,我发现Flink使用Kyro默认序列化程序,效率很低。我知道,如果我们定义一个而不是使用Kyro默认值,那么custom_serializer可以帮助减少开销。我现在正在试用谷歌protobuf,看看它是否表现得更好。
而且,我也期待着增加taskmanager.network.memory.fract,它适合我的工作并行性。然而,要找出正确的计算来设置上述配置。
我在这里回答我自己的问题,因为我的尝试对我有效。我在Grafana中发现了与我的Flink工作相关的额外指标。其中两个度量是GC时间和GC计数。我看到GC(垃圾回收(指标出现了一些不错的峰值。原因可能是,我有一些新的对象创建正在进行作业管道。考虑到我正在处理的TB数据和每天200亿条记录,这个对象的创建失控了。我对它进行了优化,以尽可能多地重用对象,从而减少了内存消耗。
我已经将taskmanager.network.memory增加到所需的值,该值设置为1GB默认值。
在上面的问题中,我讨论了自定义序列化程序以减少网络开销。我尝试用Kyro实现protobuf序列化程序,protobuf生成的类是最终的。如果我必须更新对象,我必须创建新的对象,这将在GC度量中创建尖峰。所以,避免使用它。也许我可以进一步更改protobuf生成类来满足我的需求。如果情况不一致,将考虑采取该步骤。