如何在价值转换器中将事件从标点符号实例向下游转发



在KafkaStream中,当实现ValueTransformer或ValueTransformerWithKey时,在transform((调用时,我会安排一个新的标点符号。当执行标点符符的方法punctuate(( 时,我希望它使用上下文实例向下游转发事件。但是,上下文实例似乎未定义作为 DSL 拓扑的一部分。

关于如何使用变压器执行此操作的任何线索?

在处理器中使用相同的逻辑,实现它工作的低级处理器拓扑。

在 ValueTransformerWithKey 中:

@Override 
    public Event transform(final String key, final Event event) { 
        this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
        return null;
}

在My标点符号中:

private class MytPunctuator implements Punctuator {
    private String key;
    private ProcessorContext context;
    private Event event;
    MyPunctuator(ProcessorContext context, String key, Event event)
    {
        this.context = context;
        this.key = key;
        this.event = event;
    }
    @Override
    public void punctuate(final long timestamp) {
        context.forward(key, AlertEvent.builder().withSource(event).build());
        context.commit();
    }
}

执行时

myStream
    .groupByKey(Serialized.with(Serdes.String(), Event.serde()))
    .reduce((k, v) -> v)
    .transformValues(() -> valueTransformerWithKey)
    .toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));

我希望标点符号生成的警报事件在过期后被警告到 ALARM 主题。

相反,我得到了以下异常:不支持ProcessorContext.forward((。

像往常一样,我在javadoc中找到了关于ValueTransformerWithKey接口的答案:https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html

请注意,在转换中使用 ProcessorContext.forward(Object, Object

( 或 ProcessorContext.forward(Object, Object, To( 是不允许的,这将导致异常。

但是,实现 Transformer 接口允许使用 context.forward((。感谢@Matthias J. 萨克斯

https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html

如果要向下游转发多个输出记录,可以使用 ProcessorContext.forward(Object, Object( 和 ProcessorContext.forward(Object, Object, To(。如果记录不应转发到下游,则转换可以返回 null。

最新更新