Apache Flink:如何使用跳过策略



我想在第一次模式匹配之后跳过匹配的事件。

如何使用Flink的跳过策略?

我有一个简单的方案,如果 downloads > 1000 ,然后向用户发出警报。

在我的实施中,在第一场比赛之后,由于1000后的多次下载增加,它不断发出警报。

在第一场比赛后如何跳过所有警报?

我已经阅读了跳过策略文档,但示例或实施对我有帮助。

cep更多地用于重复的模式,并且在这里确实不起作用。您正在谈论的跳过策略实际上被称为"匹配跳过策略",这意味着您想匹配多个模式。

我认为这样做的更好的方法就是过程函数,例如:

class AlertUserOnce extends ProcessFunction[element, element] {
  lazy val turnOff: ValueState[Boolean] = getRuntimeContext
     .getState(new ValueStateDescriptor[Boolean]("turn-function-off", classOf[Boolean]))

  override def processElement(value: element, ctx: Context, out: Collector[element]): Unit = {
   if (turnOff.value == null && element.downloads > 1000) {
     alertUser(value)
     turnOff.update(true)
    }
    out.collect(value)
  }
}
...
stream.keyBy(...).process(new AlertUserOnce) 

请查看此示例:https://github.com/elenikougiou/flink-cep-examples/blob/main/src/src/main/java/java/flinkcep/cases/cepcase4.java

 AfterMatchSkipStrategy skipToLast = AfterMatchSkipStrategy.skipToLast("start"); // Skip "start" event (a)
        // Choose after match skip strategy
        AfterMatchSkipStrategy skipStrategy = skipToLast;
        Pattern<Event, ?> pattern = Pattern.<Event>begin("start", skipStrategy).where(new SimpleCondition<>() {
            @Override
            public boolean filter(Event value){
                return value.getName().startsWith("a");
            }

相关内容

  • 没有找到相关文章

最新更新