将值发送到输出主题后清除KTable条目



我有一个数据库,用于存储每个网页的页面浏览量。它通过使用名为pageviews的Kafka主题来实现这一点,其中每条消息的页面名称为keyvalue为自上一条消息以来的视图数。

这是pageviews主题中预期的消息示例:

页面浏览主题:

key: "index", value: 349
key: "products", value: 67
key: "index", value: 15
key: "about", value: 11
...

pageviews的使用者每次都将上述values添加到PAGEWWS表中。

现在,我正在构建pageviews主题的制作人。该应用程序的数据源是viewstream主题,其中每个视图创建一条消息,如:

viewstream主题:

key: "index", value: <timestamp>
key: "index", value: <timestamp>
key: "product", value: <timestamp>
...

在Kafka Stream应用程序上,我有以下拓扑结构:

PageViewsStreamer:

builder.stream("viewstream")
.groupByKey()
.aggregate(...) // this builds a KTable with the sums of views per page
.toStream()
.to("pageviews")

我对这个拓扑有两个问题:

  1. 在向pageviews生成输出消息后,保存聚合的KTable不会被重置/清除,因此,通过简单地将聚合值添加到DB表,我们会得到错误的结果。如何实现发送到pageviews的每条消息不包括以前消息中已经发送的视图?

  2. 我希望pageviews消息每15分钟发送一次(默认速率约为每30秒(。

我正在尝试为两者使用窗口,但到目前为止我都失败了。

您可以使用15分钟的滚动窗口来实现此行为,并在窗口时间过去之前抑制结果(记住添加一个宽限时间来绑定前一个窗口将接受的事件的延迟(。在此处查看详细信息。我会这样做:

builder.stream("viewstream")
.groupByKey()
//window by a 15-minute time windows, accept event late in 30 second, you can set grace time smaller
.windowedBy(TimeWindows.of(Duration.ofMinutes(15)).grace(Duration.ofSeconds(30)))
.aggregate(...) // this builds a KTable with the sums of views per page
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
//re-select key : from window to key
.selectKey((key, value) -> key.key())
.to("pageviews");

相关内容

  • 没有找到相关文章

最新更新