Bellow是我写的简单代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = new ListBuffer[Tuple3[String,Int,Int]]
val random = new Random()
for(x <- 0 to 4){
if(random.nextBoolean()){
list.append(("INSERT",2,1))
} else {
list.append(("UPDATE",2,1))
}
}
val data = env.fromElements(list).flatMap(_.toList)
val keyed = data.keyBy(0).sum(1)
keyed.print()
val reKeyed = keyed.keyBy(0).sum(2)
reKeyed.print()
env.execute()
重新键入的数据流应将键入视为输入数据源。但是,打印的结果显示它们来自原始数据源。 如果第二次只调用KeyBy而不调用sum方法, 打印的结果是相关的。 那么,问题出在哪里呢?
问题是,如果您调用keyBy
两次,第二次调用将覆盖第一次调用,因此元素最终可能会位于与以前不同的任务管理器上。 对于这种情况,您正在谈论您实际上想要使用DataStreamUtils.reinterpretAsKeyedStream
,它应该完全按照您的描述工作,这意味着它不应该更改先前键控Datastream
的分区。
我找不到给定代码段的任何错误,并怀疑您的期望与 API 不匹配。
我在源以及第一组和第二组求和中添加了一些打印语句。
source:1> (UPDATE,2,1)
source:1> (INSERT,2,1)
source:1> (UPDATE,2,1)
source:1> (UPDATE,2,1)
source:1> (INSERT,2,1)
first:3> (UPDATE,2,1)
first:2> (INSERT,2,1)
first:3> (UPDATE,4,1)
first:2> (INSERT,4,1)
first:3> (UPDATE,6,1)
second:2> (INSERT,2,1)
second:3> (UPDATE,2,1)
second:2> (INSERT,2,2)
second:3> (UPDATE,2,2)
second:3> (UPDATE,2,3)
如您所见,随机输入由 3 个更新和 2 个插入语句组成。因此,第一个keyBy
的结果正确显示了update,6,1
和insert,4,1
.
现在,该结果将用作第二keyBy
的输入,但由于您正在对第二列求和,因此将丢弃第一个操作的结果。您可能会期望将第一keyBy
的"最终"总和作为对第二列求和的基本记录。但实际上,它始终是作为基础的第一张唱片,这是流媒体设置中唯一合理的选择。
您真正想要的是同一组中两个字段的总和。不幸的是,流式处理 API 没有捷径,但自己实现起来很容易。
val keyed = data.keyBy(0)
.reduce((tuple1, tuple2) => (tuple1._1, tuple1._2 + tuple2._2, tuple1._3 + tuple2._3))
keyed.print("first")
这会产生
source:4> (INSERT,2,1)
source:4> (INSERT,2,1)
source:4> (INSERT,2,1)
source:4> (UPDATE,2,1)
source:4> (INSERT,2,1)
first:3> (UPDATE,2,1)
first:2> (INSERT,2,1)
first:2> (INSERT,4,2)
first:2> (INSERT,6,3)
first:2> (INSERT,8,4)
此解决方案也更有效,因为分组数据非常昂贵。