我在使用 Apache Flink Streaming API 时遇到了问题。
我可以使用自定义数据源设置整个 CEP 环境,当在该源上使用标准接收器(如"print(("(时,一切正常。
这是我的水槽现在的样子:
@RequiredArgsConstructor
public class EventDataConsumer extends RichSinkFunction<EventData>{
private final transient Consumer<EventData> consumer;
@Override
public void invoke(EventData eventData) throws Exception {
consumer.accept(eventData);
}
}
我试图实现的是,将方法引用传递给此 SinkFunction,该函数应针对我的 DataStream 中的每个元素执行。
这是我初始化 SinkFunction 的方式:
EventDataConsumer consumer = new EventDataConsumer(someService::handleEventData);
outStream.addSink(consumer);
我的问题是,当我在自定义接收器的"invoke"方法中设置断点时,即使我显式调用构造函数(它分配了使用者(,使用者似乎也为 null。
由于接收器分布到与接收器并行性一样多的实例,因此它应该是可序列化的。在群集上执行时,Sink
被序列化发送到反序列化的TaskManagers
。
在您的示例中,consumer
字段是transient
,这就是为什么序列化后它变得null
。