我有一个数据库,用于存储每个网页的页面浏览量。它通过使用名为pageviews
的Kafka主题来实现这一点,其中每条消息的页面名称为key
,value
为自上一条消息以来的视图数。
这是pageviews
主题中预期的消息示例:
页面浏览主题:
key: "index", value: 349
key: "products", value: 67
key: "index", value: 15
key: "about", value: 11
...
pageviews
的使用者每次都将上述values
添加到PAGEWWS表中。
现在,我正在构建pageviews
主题的制作人。该应用程序的数据源是viewstream
主题,其中每个视图创建一条消息,如:
viewstream主题:
key: "index", value: <timestamp>
key: "index", value: <timestamp>
key: "product", value: <timestamp>
...
在Kafka Stream应用程序上,我有以下拓扑结构:
PageViewsStreamer:
builder.stream("viewstream")
.groupByKey()
.aggregate(...) // this builds a KTable with the sums of views per page
.toStream()
.to("pageviews")
我对这个拓扑有两个问题:
在向
pageviews
生成输出消息后,保存聚合的KTable不会被重置/清除,因此,通过简单地将聚合值添加到DB表,我们会得到错误的结果。如何实现发送到pageviews
的每条消息不包括以前消息中已经发送的视图?我希望
pageviews
消息每15分钟发送一次(默认速率约为每30秒(。
我正在尝试为两者使用窗口,但到目前为止我都失败了。
您可以使用15分钟的滚动窗口来实现此行为,并在窗口时间过去之前抑制结果(记住添加一个宽限时间来绑定前一个窗口将接受的事件的延迟(。在此处查看详细信息。我会这样做:
builder.stream("viewstream")
.groupByKey()
//window by a 15-minute time windows, accept event late in 30 second, you can set grace time smaller
.windowedBy(TimeWindows.of(Duration.ofMinutes(15)).grace(Duration.ofSeconds(30)))
.aggregate(...) // this builds a KTable with the sums of views per page
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
//re-select key : from window to key
.selectKey((key, value) -> key.key())
.to("pageviews");