我认为我有一个相当非标准的用例。我想使用filter
函数将我的源流拆分为几个流:
val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)
我还有一个时间戳提取器(传入事件将以XML形式附加一个时间标记(:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...
dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...
class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, "@timestamp").getMillis
}
我之所以选择这种方法,是因为我不想简单地做单个流(val s = dataStream.filter(...).map(...).filter(...).map(...)
(,因为我想构建一个网络,它可以拆分/组合任意流(例如s1+s2->c1,s1+s3->c2,c2+s4->c3,…(
现在,当通过上述示例发送事件时,事件E1可能同时出现在s1和s2中。这意味着,在我的理解中,完全相同的事件E1被作为第一实例放入s1(E1a(,也被作为第二实例放入s2(E1b(。
所以我现在想做的是将E1a和E1b重新组合成一个组合的E1,它类似于E1,同时也是s1和s2的变换。
我试过了:
val c1 = s1.join(s2)
.where(_.key).equalTo(_.key)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply((e1a, e2b) => { printf("Got e1a and e1b"); e1a })
然而,这些事件似乎从未达到应用功能,我也不知道为什么。
我的例子有什么错?我这样的流网络的方法/想法会奏效吗?
你安排好水印了吗?使用事件时间时,只有当水印到达时,才会触发窗口,该水印使事件时间时钟提前超过窗口的末尾。您可以使用时间戳提取器/水印生成器来完成此操作;有关更多详细信息,请参阅文档中的示例。
如果其中一个流有时是空闲的,这也会导致问题,因为空闲流上没有水印会阻碍它所连接的任何流的水印。
根据您要做的具体操作,您可能会发现使用CoProcessFunction比使用时间窗口联接更容易。举个例子,看看Flink训练网站上关于状态丰富和过期状态的练习。