我有一个正在运行的Kafka Streams应用程序,该应用程序目前正在从两个不同的主题创建两个KStreams。 这部分工作得很好。
现在,我想加入它们,并获得第一个值和第二个值的"聚合记录"。 键是简单的 Java 字符串,值是 avro 编码的 GenericRecords。
根据文档,我应该能够执行以下操作:
KStream<String, GenericAvroSerde> joined =
inputTopicStartKStream.leftJoin(inputTopicEndKStream,
(left, right) -> { ??? }
JoinWindows.of(Duration.ofHours(24)),
Joined.with(
stringSerde,
genericAvroSerde,
genericAvroSerde)
);
但是,从我在网上找到的文档或教程中不清楚我可以在上面的部分中做什么 { ??? }
. 我已经尝试了上述的多种变体,但没有运气。 我正在使用 Kakfa Streams 2.2.0 版本,如果它很重要的话。
我只想有一个 <key, merge value1 + value2>
的输出流,用于具有相同键的两个流上的记录。 我可以手动合并值,但不清楚如何访问 lambda 右侧的值。
> 在 ValueJoiner (left, right) -> { ??? }
中,左边表示左流中的值,右边表示右流中的值
您所要做的就是在ValueJoiner中添加您的代码,如下所示:
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
KStream<String, GenericAvroSerde> joined =
inputTopicStartKStream.leftJoin(inputTopicEndKStream,
(left, right) -> {
// You can get access to the generic Avro record by
// casting both left and right values
Record leftRecord = (Record) left;
Record rightRecord = (Record) right;
// For the original question, you can simply create a new GenericRecord
// with the contents of left and right records
GenericRecord record = new GenericData.Record(schema);
record.put("left", left);
record.put("right", right);
}
JoinWindows.of(Duration.ofHours(24)),
Joined.with(
stringSerde,
genericAvroSerde,
genericAvroSerde)
);