我正在使用DataSet API,以便读取大量文件并将它们扔进cassandra。
在其中一个步骤中,我正在做很多HTTP请求,我想知道我每秒发送了多少个请求。
对于流API,使用滑动窗口是非常直接的,但是如何使用数据集API呢?您可能想看看Flink的指标系统。它具有测量速率的Meter度量类型,即特定事件在一段时间内发生的频率。
不幸的是,Meter指标还没有包含在Flink版本中,而只在当前的主分支(Flink 1.2 SNAPSHOT)中可用。所以你需要自己建立Flink。
如果您将请求的时间戳分配给DataSet
中的对象,则可以使用.groupBy()
转换,并使用键提取器提取窗口标识符(例如,从时间戳中解析的第二个),然后应用计数聚合。
例如,计数聚合可以通过使用.map()
变换扩展具有整数1
的元组,并在groupBy之后使用.sum()
变换汇总整数来完成。