我有一个Fink拓扑,它由多个Map和FlatMap转换组成。源/接收来自/到Kafka。Kakfa记录的类型为Envelope
(由其他人定义),并且没有标记为"可序列化"。我想对这个拓扑进行单元测试。我定义了一个简单的SourceFunction,它返回Envelope
的列表作为源:
public class MySource extends RichParallelSourceFunction<Envelope> {
private List<Envelope> input;
public MySource(List<Envelope> input) {
this.input = input;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void run(SourceContext<Envelope> ctx) throws Exception {
for (Envelope listElement : inputOfSubtask) {
ctx.collect(listElement);
}
}
@Override
public void cancel() {}
}
我使用MiniClusterWithClientResource对拓扑进行单元测试。我遇到了两个问题:
- 我需要使MySource可序列化,因为Flink想要/需要序列化源。作为解决办法,我使
input
瞬态。允许代码编译。 - 然后我遇到运行时错误:
org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
我试图理解为什么我得到这个错误,这是我以前没有得到的,当拓扑使用KafkaConsumer从kafka集群消费。我找到了一个解决方法,通过使用以下命令提供Type信息:
.returns(TypeInformation.of(Envelope.class))
- 然而,在运行时,反序列化后,
input
被设置为null
(显然,因为没有定义反序列化方法)。
问题:
- 有人能帮我理解为什么我得到
InvalidTypesException
例外吗? - 为什么mysql被反序列化/序列化?是否有一种方法,我可以无效,而使用miniclusterwithclientresource ?
- 我可以破解一些
writeObject()
和readObject()
方法在MySource。但我不想走那条路。是否有可能使用一些框架/类来测试拓扑,而不提供可序列化的源(和接收器)?如果我可以使用像KeyedOneInputStreamOperatorTestHarness
这样的东西,我可以作为拓扑传递,并且在开始时避免整个反序列化/序列化步骤,那就太好了。
如有任何意见或指点,不胜感激。
谢谢你,艾哈迈德。
- 为什么我得到InvalidTypesException异常?
不确定,通常我需要查看工作流定义来了解类型信息被丢弃的位置。
- "为什么MySource被反序列化/序列化?">
因为Flink将操作符序列化,然后通过网络发送,然后反序列化,从而将它们分配给多台机器上的多个任务。
- "在使用MiniClusterWithClientResource时,是否有一种方法可以使此无效?">
是的。由于MiniCluster在单个JVM中运行,因此可以使用静态ConcurrentLinkedQueue
来保存所有Envelope
记录,而MySource
只从该队列中读取。
Nit:你的MySource
应该在open()
方法中设置transient boolean running
标志为true,在cancel()
方法中设置false,并在run()
方法的循环中检查它。