在Kafka Streams DSL中使用Inner Join获取记录密钥



是否有一种方法可以从kafka流DSL中的join部分传递或访问消息键?

我现在有这样的东西:

    KStream<String, GenericRecord> completedEventsStream = inputStartKStream.
        join(
            inputEndKStream,
            (leftValue, rightValue) -> customLambda((Record) leftValue, (Record) rightValue),
            JoinWindows.of(windowDuration),
            Joined.with(stringSerde, genericAvroSerde, genericAvroSerde)
        );

但是,leftValuerightValue记录传递给customLambda无法访问KAFKA消息键,因为那是一个单独的字符串。他们拥有的唯一内容是消息本身,而不是密钥。

有没有一种方法可以从联接lambda内部访问密钥?我能做的一件事就是简单地将消息键添加为消息本身的一部分,然后在那里访问它,但是我想知道该框架是否提供了直接访问的方法?

大多数情况下,密钥在记录的值中也可用,您的应用程序不是吗?

看来ValueJoiner接口作为KIP-149的一部分进行了改进,但没有像KIP中的其他方法那样完成:ValueTransformerValueMapper

您可以在加入之前添加一个步骤以提取键,并在使用ValueMapperWithKey加入之前将其包含在消息的值中。

相关内容

  • 没有找到相关文章

最新更新