如果问题本身表明缺乏知识,请道歉。我有一个单一的源,我已经根据处理的事件中键的某个值进行了拆分(通过端输出(。3/4 个唯一事件类型有一个键,我可以用来确保它们属于同一个完整事务,但第 4 个事件只能通过某些时间戳条件配对 - 可能有 n 个> 1 个这样的事件需要配对。这是食品订单交付系统的示例;
Event1: {'order_id' : xxx, 'event_type' : 'order_confirmed', 'ts' : IS8601}
Event2: {'order_id' : xxx, 'vehicle_number' : yyy, 'event_type' : 'order_picked_up_by_driver', 'ts' : IS8601}
**Event3a: {'vehicle_number' : xxx ,'event_type' : 'driver_reached_checkpoint', 'ts' : IS8601}
**Event3b: {'vehicle_number' : xxx, 'event_type' : 'driver_reached_checkpoint', 'ts' : IS8601 + ~1 hour}
Event4: {'order_id' : xxx, 'event_type' : 'order_delivered', 'ts' : IS8601 + ~1 hour}
请注意,事件 1、2 和 4 具有order_id但 3a、3b 没有(尽管可与事件 2 配对(。目前的计划是按order_id将事件 1、2、4 键放在一起,并存储要传递的状态,直到已知 3a、3b 是唯一适合 Event2 和 4s 时间戳范围内的事件。由于要order_id的车辆编号是一对多关系,因此每当我看到新事件 2 时,我需要更新驾驶员的状态以指向新订单,并返回上一个订单的所有事件以进行进一步处理。
真的可以使用一些方向来解决这个问题,即使是面包屑。Flink 和流媒体的新手。
---编辑---
我想存储每个订单的状态,一旦我看到每个事件类型的实例(在事件 3 的情况下是多个(,将这些事件合并在一起,在输出到另一个源和接收器之前计算一些统计信息。我无法使用键控状态运算符,因为事件类型 3 没有可用的键"order_id"。我可以放心地假设事件是有序的。
示例输出,let et = 事件类型
'order_completed_ts' : <et4>, 'number_of_checkpoints_en_route_to_completion' : <et3>}
对于不同的顺序,可以有多个相同类型的事件。
样本
1. {'order_id' : 1, 'event_type' : 'order_confirmed', 'ts' : IS8601}
2. {'order_id' : 2, 'event_type' : 'order_confirmed', 'ts' : IS8601}
3. {'order_id' : 3, 'event_type' : 'order_confirmed', 'ts' : IS8601}
4. {'order_id' : 4, 'event_type' : 'order_confirmed', 'ts' : IS8601}
5. {'order_id' : 1, 'event_type' : 'order_picked_up_by_driver', 'ts' : IS8601}
- 注意示例元素 5 如何包含与元素 1 相同的订单 ID
在我看来,您希望管道的第一阶段将order_ids与没有它们的事件相关联,这将使分析的其余部分更加简单。
如果你使用 Flink SQL,那么这将是一个临时表连接,你可以在vehicle_number
上将流与自身连接。您可以通过以下方式执行此操作:每个driver_reached_checkpoint
事件都将与driver_reached_checkpoint
事件之前的最新order_picked_up_by_driver
事件联接。
或者,要使用数据流 API 执行此操作,您可以按vehicle_number
对流进行键控,并实现一个在键控状态下记住最近order_picked_up_by_driver
事件的KeyedProcessFunction
。然后,它可以将该事件中的order_id
添加到它看到的每个driver_reached_checkpoint
事件中。
一旦您有一个流,其中包含每个事件的order_id
,您就可以按order_id
(如果使用 SQL,则按 GROUP BY(进行键控。