我正在使用 flink 有状态函数进行一个新项目。我已经使用 FunctionTestHarness 编写了一些基本的单元测试,但使用此方法的测试无法测试有状态函数之间的交互。
flink 测试文档(基本 flink,不用于有状态函数(演示如何使用MiniClusterWithClientResource
运行完整的作业,然后对作业的输出做出断言。我正在寻找一种对有状态函数做类似事情的方法。
statefun-flink-harness-example看起来非常有前途,但是使用Harness的RunnerTest
标有@Ignore
,因为它永远不会终止。这对于调试很有用,但不能在自动测试中使用。
以下是我到目前为止发现的问题,这些问题使得编写以 Harness 终止的测试变得困难:
Harness 使用 SerializableSupplier- 来提供输入,SerializableSupplier 无法说它已完成。这意味着任何使用 Harness 的测试始终在等待更多输入。
- 如果 Harness 知道所有输入都已发送,则需要一种在没有挂起事件时终止的方法。
- 更复杂的是,由于上下文
.sendAfter()
发送的延迟事件,某些系统仍然永远不会终止
我认为这是启用可以从 CI/CD 进程运行的更有趣的自动化测试的常见需求。有没有人找到解决上述问题的方法,或者使用Harness以外的工具发现了一种完全不同的方法?
Harness 还包含一个.withFlinkSourceFunction()
方法,允许使用任何 FlinkSourceFunction
作为入口。
您可以创建自己的源函数,该函数将生成有限的元素集合,例如:
class FiniteSource<T extends Serializable> implements SourceFunction<T> {
private final List<T> items;
FiniteSource(List<T> items) {
this.items = items;
}
@Override
public void run(SourceContext<T> sourceContext) {
for (T item : items) {
sourceContext.collect(item);
}
}
@Override
public void cancel() {}
}
然后,可以通过以下方式修改线束示例:
FiniteSource<MyInputMessage> finiteSource = new FiniteSource<>(
Arrays.asList(
new MyInputMessage("user-1", "hello"),
new MyInputMessage("user-2", "world")));
Harness harness =
new Harness()
.withKryoMessageSerializer()
.withFlinkSourceFunction(MyConstants.REQUEST_INGRESS,finiteSource)
.withPrintingEgress(MyConstants.RESULT_EGRESS);
harness.start();
这应该在入口中生成两条输入消息后终止。如果你认为这是一个常见的要求,那么我鼓励你在 Flink 邮件列表中提出这个问题, 我相信那里的友好社区会很乐意接受您的反馈,更重要的是;)
有没有人找到解决上述问题的方法,或者可能发现了一个 使用线束以外的工具完全不同的方法?
对于 CI/CD 管道,我建议查看我们的 e2e 测试, 基于测试容器。(例如这个(