我正在使用 Flink 和 FlinkCEP 来检测数据流上的复杂事件。出于研究目的,我只需要测量识别时间。
我正在使用Flink / FlinkCEP - 1.7.1
.我正在使用env.fromCollection()
函数在 Flink 环境中创建流。 之后,我将使用FlinkCEP:CEP.pattern(....)
以及其他select
和print
函数。
我只找到了这篇文章:在 flink 中测量作业执行时间,这有很大帮助。它建议了一个返回流环境进程执行时间的解决方案。这不是我要找的。
我注意到返回的值包括其他运算符(例如.assignAscendingTimestamps(x => x.TimeStamp())
(的时间,因此我无法使用它。
有没有办法只测量CEP.pattern
过程的时间? 在这种情况下,我也找不到可以帮助我的指标,除非我错过了什么......
您可以向每条记录添加一个时间戳字段,并在 CEP 之前使用 mapFunction 将当前时间放入该字段中。然后使用它来计算 CEP 中经过的时间,然后立即在 RichMapFunction 中 - 然后您可以通过自定义指标报告该时间,或发送到接收器。这将增加一些开销,但不会太多。只要可以避免这两个函数之间的任何 keyBy 或重新平衡调用,所涉及的所有内容都将通过函数调用链接在一起,而不会产生任何序列化或网络开销。