我有一个卡夫卡流 - 比如博客和卡夫卡表 - 说与这些博客相关的评论。来自 kafka 流的键可以映射到 Kafka 表中的多个值,即一个博客可以有多个评论。我想连接这两个并创建一个带有注释 ID 数组的新对象。但是当我进行连接时,流只包含最后一个注释 ID。是否有任何文档或示例代码可以为我指明如何实现这一目标的正确方向?基本上,是否有任何文档详细说明如何使用 Kafka 流和 Kafka 表进行一对多关系连接?
KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl,
(blogId, blog) -> blog.getBlogId(),
(blog, comment) -> new EnrichedBlog(blog, comment));
所以不是评论 - 我需要有一个评论 ID 数组。
我在您的代码示例中找不到签名匹配的 join 方法,但我认为这是问题所在:
KTables 被解释为 changlog,也就是说,具有相同键的每条下一条消息都被解释为对记录的更新,而不是新记录。这就是为什么您只看到给定键(博客 ID)的最后一个"评论"消息,以前的值被覆盖。 为了克服这个问题,您需要首先更改填充 KTable 的方式。您可以做的是将评论主题作为 KStream 添加到拓扑中,然后执行聚合,该聚合仅生成共享相同博客 ID 的数组或评论列表。该聚合返回一个 KTable,您可以使用它加入您的博客 KStream。
下面是如何构建列表值 KTable 的草图:
builder.stream("yourCommentTopic") // where key is blog id
.groupByKey()
.aggregate(() -> new ArrayList(),
(key, value, agg) -> new KeyValue<>(key, agg.add(value)),
yourListSerde);
列表在聚合中比数组更容易使用,因此我建议您在需要时将其转换为下游数组。您还需要为您的列表提供一个 serde 实现,即上面示例中的"yourListSerde"。
如果您将 avro 与模式注册表一起使用,则应编写自己的聚合器,因为 kafka 流无法序列化 ArrayList。
val kTable = aStream
.groupByKey()
.aggregate(
{
YourAggregator() // initialize aggregator
},
{ _, value, agg ->
agg.add(value) // add value to a list in YourAggregator
agg
}
)
然后将kTable
与您的其他流(bStream
)一起加入
bStream
.join(
kTable,
{ b, a ->
// do your value join from a to b
b
}
)
对不起,我的片段是用 Kotlin 写的。
正如上面 Michal 的正确答案所指出的,在这种情况下,由blogId
键控的KTable
不能用于跟踪博客,因为只有最新的博客值保留在这样的表中。
作为对他的回答中提到的解决方案的建议优化,请注意,如果每个博客有很多评论,那么在.aggregate()
中保留不断增长的列表可能会在数据大小和时间上变得昂贵。这是因为在后台,该聚合的每次迭代都会导致不断增长的List
实例,由于数据重用,这在 java 或 scala 中是可以的,但每个实例都单独序列化到底层状态存储中。示意性地,假设某个键有 10 个注释,那么这个表达式被调用 10 次:
(key, value, agg) -> new KeyValue<>(key, agg.add(value))
每次生成大小为 1 的列表,然后是 2,然后......然后是 10 个,每个都独立序列化到 Under the Hood 状态存储中,这意味着总共将序列化1+2+3+...+10=55
值(好吧,也许有一些优化 s.t. 其中一些序列化被跳过了,我不知道,尽管我认为空间和时间的复杂性是相同的)。
另一种(尽管更复杂的)方法是在状态存储中使用range scans
,这使得数据结构看起来有点像 DynamoDB 等键值存储中的(partition_key, sort_key)
,其中我们使用(blogId, commentId)
这样的键存储每个注释。在这种情况下,您仍然会通过blogId
keyBy()
注释流,然后将其.transform(...)
以将其传递给处理器 API,您可以在其中应用范围扫描思想,每次向状态存储添加(即序列化)一个补充注释,而不是整个列表的新实例。
当我们想象(blogId, commentId)
键的许多实例时,一对多关系变得非常明显,这些实例都具有相同的blogId
和不同的commentId
,它们都存储在同一个物理节点的同一状态存储实例中,并且整个事情在很多节点中并行发生了很多blogId
。
我在我的博客上放了更多关于这种模式的细节:一对多Kafka Streams Ktable连接,我在github中放了一个完整的工作示例。