如何根据Apache Flink中的第二个键拆分窗口?



我试图创建一个产品扫描仪的数据流处理,该扫描仪以以下Tuple4的形式生成事件:时间戳(长,以毫秒为单位),ClientID(int), ProductID(int), Quantity(int)。

最后,应该获得Tuple3的流:ClientID(int), ProductID(int), Quantity(int),它表示具有给定ClientID的客户端购买的具有相同ProductID的所有产品的分组。对于任何"交易"产品扫描之间最多可以有10秒的间隔。

这是一段简短的代码片段,显示了我最初的尝试:

DataStream<Tuple4<Long, Integer, Integer, Integer>> inStream = ...;
WindowedStream<Tuple4<Long, Integer, Integer, Integer>, Integer, TimeWindow> windowedStream = inStream
.keyBy((tuple) -> Tuple2.of(tuple.f1, tuple.f2))
.window(EventTimeSessionWindows.withGap(Time.seconds(10)));

windowedStream.aggregate(...); // Drop timestamp, sum quantity, keep the rest the same

然而,这就是问题所在。通常情况下,一个SessionWindow就足够了,但在本例中,它使用键(ClientID, ProductID)在两个事件之间实现了10秒的间隔,这不是预期的。

如果我们想象以下元组进来:

  1. (10_000, 1,1,1)<6秒间隔>
  2. (1,1,1,1)<6秒间隔>
  3. (22_000, 1,1,1)<6秒间隔>
  4. (28_000, 1,2,1)

元组的序列应该在同一个SessionWindow中,1和2应该分别与3合并,产生两个输出事件。但是,它们不在同一个SessionWindow中,因为1+3和2+4被keyBy分隔在各自的流中,并且它们没有聚合,因为它们不满足产品之间最大10秒的要求。

我想知道是否有一种方法可以解决这个问题与应用"second"关键。首先,应该根据关键字ClientID拆分流,然后应用SessionWindow(不管产品是什么)。在此之后,我想知道是否有一种方法可以通过使用第二个键(这将是ProductID)来细分ClientID键的SessionWindows,并有效地达到与以前相同的键(ClientID, ProductID),而没有前面的问题。然后,聚合可以正常应用以达到预期的输出流。

如果这是不可能的,有没有其他的方法来解决这个问题?

解决这个问题的最简单的方法是在ClientID上进行分区,以捕获特定客户端完成的所有扫描,然后使用process,这将使您可以访问特定窗口中的所有元素,在那里您可以为每个ProductID生成单独的事件或输出。有什么原因导致这在你的设置中不工作吗??

最新更新