我有一个从RichSinkFunction扩展而来的接收器,它正在缓存一些信息。当我的过程完成时,我想更新所有缓存的信息,以便强制调用它。
我可以从KeyedProcessAccumulatorFunction调用该接收器,用类型为ReadOnlyContext的上下文调用它,它就可以工作了。
public class PageAccumulateFunction implements KeyedProcessAccumulatorFunction{
public SessionAccumulator accumulate(
@NonNull Tuple2<CollectionMessage, PropertyInfo> value,
@NonNull SessionAccumulator accumulator,
@NonNull KeyedBroadcastProcessFunction.ReadOnlyContext ctx) {
....
ctx.output(outputTag, message);
}
}
但是在我的RichMapFunction类中,我不能调用那个接收器。我可以获得RuntimeContext对象(但不能获得ReadOnlyContext(,但我不知道是否可以使用它来调用RichSinkFunction接收器。
public class SessionMapper extends RichMapFunction<SessionAccumulator, GenericRecord>{
public GenericRecord map(SessionAccumulator sessionAccumulator) {
....
RuntimeContext ctx = getRuntimeContext();
....
}
}
知道吗?
只有流程函数才能使用侧输出(通过ctx.output
写入(。
MapFunction
自动向下游(向信宿(发送其map
方法的返回值。它是这样工作的,因为映射是从输入到输出的一对一映射。大多数其他函数类型(例如,流程函数、平面图(都会传递一个收集器,您可以使用该收集器向下游发送事件。