ProcessAllWindowFunction的Scala单元测试



阅读官方flink测试文档后(https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html)我能够使用测试线束为ProcessFunction开发测试,类似于以下内容:

pendingPartitionBuilder = new PendingPartitionBuilder(":::some_name", "")
testHarness =
new OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData, PendingPartition](
new ProcessOperator[StaticAdequacyTilePublishedData, PendingPartition](pendingPartitionBuilder)
)
testHarness.open()

现在,我正在尝试对ProcessAllWindowFunction做同样的操作,它看起来像这样:

class MapVersionValidationDistributor(batchSize: Int) extends
ProcessAllWindowFunction[MapVersionValidation, Seq[StaticAdequacyTilePublishedData],TimeWindow] {
lazy val state: ValueState[Long] = getRuntimeContext .getState(new ValueStateDescriptor[Long]("latestMapVersion", classOf[Long]))
(...)

首先我意识到我不能将TestHarness用于ProcessAllWindowFunction,因为它没有processElement方法。在这种情况下,我应该遵循什么单元测试策略?

编辑:目前我的测试代码如下:

val collector = mock[Collector[Seq[StaticAdequacyTilePublishedData]]]
val mvv = new MapVersionValidationDistributor(1)
val input3 = Iterable(new MapVersionValidation("123",Seq(TileValidation(1,true,Seq(1,3,4)))))
val ctx = mock[mvv.Context]
val streamContext =  mock[RuntimeContext]
mvv.setRuntimeContext(streamContext)
mvv.open(mock[Configuration])
mvv.process(ctx,input3,collector)

我得到了这个错误:

Unexpected call: <mock-3> RuntimeContext.getState[T](ValueStateDescriptor{name=latestMapVersion, defaultValue=null, serializer=null}) Expected: inAnyOrder { }

您并不需要测试线束来对ProcessAllWindowFunctionprocess方法进行单元测试。process函数接受3个参数:ContextIterable[IN]Collector[OUT]。根据用于模拟Context的语言,您可以使用一些库。您还可以根据您在这里的前提条件轻松地实现或模拟Collector。Iterable[IN]只是一个包含Your窗口元素的Iterable,它将在窗口被触发后传递给函数。

相关内容

  • 没有找到相关文章

最新更新