我试图写一个简单的测试,但不确定为什么它不工作这里是我的代码。不知道我错过了什么。在测试中似乎从来没有调用触发器来输出。管道只是在每一个窗口的每一秒发出字符串的聚合输出。我错过了什么?
object CollectSink {
val collectedResults = mutable.ListBuffer[(Int, String)]()
}
class CollectSink extends SinkFunction[(Int, String)] {
override def invoke(value: (Int, String), context: SinkFunction.Context): Unit = {
CollectSink.collectedResults += value
}
}
class MyTestSuite extends BaseSuite with BeforeAndAfterAll with BeforeAndAfterEach {
val flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build
)
override def beforeAll() = {
flinkCluster.before()
}
override def beforeEach() = {
CollectSink.collectedResults.clear()
}
override def afterAll() = {
flinkCluster.after()
}
it should "Simple pipeline emits the expected output" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val sourceDataStream = env.fromCollection(
Seq(
(1, "a"),
(1, "b"),
(2, "c"),
(1, "d"),
(2, "e")
)
)
val collectSink = new CollectSink()
// Sample pipeline that emits aggregated strings
// It emits the result every second for every window of 1 second
sourceDataStream
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](Time.seconds(1)))
.reduce(new ReduceFunction[(Int, String)] {
override def reduce(value1: (Int, String), value2: (Int, String)): (Int, String) = (value1._1, value1._2 + value2._2)
})
.addSink(collectSink)
env.execute()
// Issue here is that it never collects anything somehow, size is always 0
// It seems like the the pipeline never emits anything in this test
CollectSink.collectedResults should have size 1
}
}
参考测试文档: https://nightlies.apache.org/flink/flink - docs -释放- 1.16 -/- docs/dev/datastream/testing/# testing-flink-jobs
为了保证这个测试产生结果,它必须至少运行一秒钟。只有5个事件要处理,这是不可能发生的。
有界源的作业一旦处理完所有输入就会停止。未调用挂起的处理时间计时器。
两个可能的解决方法:
- 切换到使用事件时间。当有界源的作业到达其输入的结束时,Flink触发所有挂起的事件时间计时器。
- 使用无界源,并保持测试作业运行足够长的时间,以便产生结果。