从Flink中的RichMapFunction调用RichSinkFunction



我有一个从RichSinkFunction扩展而来的接收器,它正在缓存一些信息。当我的过程完成时,我想更新所有缓存的信息,以便强制调用它。

我可以从KeyedProcessAccumulatorFunction调用该接收器,用类型为ReadOnlyContext的上下文调用它,它就可以工作了。

public class PageAccumulateFunction implements KeyedProcessAccumulatorFunction{
public SessionAccumulator accumulate(
@NonNull Tuple2<CollectionMessage, PropertyInfo> value,
@NonNull SessionAccumulator accumulator,
@NonNull KeyedBroadcastProcessFunction.ReadOnlyContext ctx) {
....
ctx.output(outputTag, message);
}
}

但是在我的RichMapFunction类中,我不能调用那个接收器。我可以获得RuntimeContext对象(但不能获得ReadOnlyContext(,但我不知道是否可以使用它来调用RichSinkFunction接收器。

public class SessionMapper extends RichMapFunction<SessionAccumulator, GenericRecord>{
public GenericRecord map(SessionAccumulator sessionAccumulator) {
....
RuntimeContext ctx = getRuntimeContext();
....
}
}

知道吗?

只有流程函数才能使用侧输出(通过ctx.output写入(。

MapFunction自动向下游(向信宿(发送其map方法的返回值。它是这样工作的,因为映射是从输入到输出的一对一映射。大多数其他函数类型(例如,流程函数、平面图(都会传递一个收集器,您可以使用该收集器向下游发送事件。

相关内容

  • 没有找到相关文章

最新更新