是否有一种方法可以从kafka流DSL中的join
部分传递或访问消息键?
我现在有这样的东西:
KStream<String, GenericRecord> completedEventsStream = inputStartKStream.
join(
inputEndKStream,
(leftValue, rightValue) -> customLambda((Record) leftValue, (Record) rightValue),
JoinWindows.of(windowDuration),
Joined.with(stringSerde, genericAvroSerde, genericAvroSerde)
);
但是,leftValue
和rightValue
记录传递给customLambda
无法访问KAFKA消息键,因为那是一个单独的字符串。他们拥有的唯一内容是消息本身,而不是密钥。
有没有一种方法可以从联接lambda内部访问密钥?我能做的一件事就是简单地将消息键添加为消息本身的一部分,然后在那里访问它,但是我想知道该框架是否提供了直接访问的方法?
大多数情况下,密钥在记录的值中也可用,您的应用程序不是吗?
看来ValueJoiner
接口作为KIP-149的一部分进行了改进,但没有像KIP中的其他方法那样完成:ValueTransformer
和ValueMapper
。
您可以在加入之前添加一个步骤以提取键,并在使用ValueMapperWithKey
加入之前将其包含在消息的值中。