我的拓扑是这样的:kafka(p:6)->reduce(p:6)->db writer(p:12)
(其中p:是并行度)。
- 我让它在带有
taskmanager.numberOfTaskSlots: 30
的单节点"集群"上运行 - 我知道我的卡夫卡音源每分钟产生约650万张唱片
- kafka"reader"的并行度等于kafka分区的#
当我观察这个作业(通过flink UI)大约1分钟时,我看到的值是:
- kafka->减少:发送约150万条记录(减少4倍以上)
- 减少(5秒的窗口聚合)->db写入发送的114K条记录(减少>2倍)1
- 接收到的数据库写入-->记录:~23K(关闭>5x)2
(其他部件的发送/接收值之间的差异较小,但我可以将其归因于测量误差)
问题:
1.其余的记录在哪里
2.这台机器运行时,负载永远不会超过1.5。还有其他限制因素吗
3.我是否误读了UI中的值?
Java 8
Flink 1.0(最新github)
机器:32核/96 Gb RAM
1这可以用聚合过程来解释
2这个值与写入数据库的内容一致。
Flink不会丢失记录,它们只是在飞行中缓冲,或者在Kafka中停留更长时间。从数字来看,您似乎正在经历背压。
您可以看到,"reducer"已经发出了许多"db-writer"尚未收到的记录。在这种情况下,这些记录仍然在运营商之间的通信信道的缓冲器中。这些通道具有有限的缓冲量(取决于配置的缓冲区的数量,通常为几个MB)。对于小型记录,他们可能会保持一些10公里以上的记录。
如果一个运算符中发送的记录数一直明显落后于接收运算符中接收的记录数,则这表明接收器(此处为"db写入器")无法跟上数据速率。也许是因为数据库处理插入的速度不够快(太同步、太细粒度提交?),也许"数据库写入器"和数据库之间的网络已经饱和。
在这种情况下,"db编写器"将对reducer进行背压,reducer最终也将对Kafka Source进行背压。
为了测试如果没有来自数据库的背压,数据速率会是多少,可以尝试一个实验,"数据库写入器"只需删除所有记录。