如何对 Flink 进程函数进行单元测试?



我有一个简单的ProcessFunction,它将String作为输入并给出一个String作为输出。如何使用 Junit 对此进行单元测试?由于 processElement 方法是一个 void 方法,不返回任何值。

public class SampleProcessFunction extends ProcessFunction<String, String>{
@Override
public void processElement(String content, Context context, Collector<String> collector) throws Exception {
String output = content + "output";
collector.collect(output);
}
}

为了对此方法进行单元测试,请定义预期的行为。在这种情况下,预期的行为是以content + "output"作为参数Collector::collect方法的单个调用。 因此,可以使用模拟收集器进行测试。

下面是一个使用 Mockito 框架的示例:

...
private final Collector<String> collectorMock = Mockito.mock(Collector.class);
private final Context contextMock = Mockito.mock(Context.class);
private final SampleProcessFunction sampleProcessFunction = new SampleProcessFunction();
@Test
public void testProcessElement_shouldInvokeCollector_whenAnyValuePassed() throws Exception {
// given
final String content = "hello ";
// when
sampleProcessFunction.processElement(content, contextMock, collectorMock);
// then
Mockito.verify(collectorMock).collect(content + "output"); // verifies that collector method was invoked with "hello output" exactly once
}
...

如果你想在对象上,你可以使用:

Mockito.atLeastOnce();
Mockito.doReturn(myCustomObject);

相关内容

  • 没有找到相关文章

最新更新