如何使用ApacheFlinkCEPSQL从已经匹配的模式中获取事件



我的要求是基于2个事件生成触发器(EVT_A和EVT_B独立于订单(。以下是预期

1. EVT_A arrived. --> No action
2. EVT_B arrived  --> Should Trigger
3. EVT_B arrived  --> should Trigger since A was received previously (o/p should include A and current B)
4. EVT_A arrived  --> should Trigger since B was received previously (o/p should include current A and last B)
5. EVT_A arrived  --> should Trigger since B was received previously (o/p should include current A and last B)

我试着追随,但没有成功。

SELECT E.*
From MyEvents
MATCH_RECOGNIZE (
ORDER BY procTime
MEASURES ARRAY[
Event(A.id, A.name, A.date),
Event(B.id, B.name, B.date)
] AS Events
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A C* B)
DEFINE
A AS name in ('EVT_A', 'EVT_B'),
B AS name in ('EVT_A', 'EVT_B') AND B.name <> A.name,
C AS name not in ('EVT_A', 'EVT_B')
) AS E;

我还试着用">在将跳跃匹配到第一个A之后";。但它也是一个例外。关于如何使用Flink SQL CEP或Flink中的任何其他方式实现这一点的任何建议。

在这种情况下,RichFlatMap或ProcessFunction似乎是最简单的方法。只需要一点状态:

ValueState<Event> lastA;
ValueState<Event> lastB;

然后处理每个传入事件的逻辑是这样的:

if EVT_A
store event as lastA
emit lastB with this event unless lastB is null
if EVT_B
store event as lastB
emit lastA with this event unless lastA is null

如果你想了解如何使用Flink的托管状态,文档中有一个教程。

相关内容

  • 没有找到相关文章

最新更新