我正在 Scala 中使用 Flink,我正在尝试获取每个存储库的独特问题计数。我有一个带有如下元组的数据流:(repo_name、issue_id、event_time(。如何获取每个repo_name的唯一issue_id计数?我想我必须使用mapWithState
,但我不确定如何使用它。
提前谢谢。
假设您想在 7 天的翻转时间窗口中处理事件。
// eventStream: stream of case classes of type GithubEvent
eventStream
// only look at IssuesEvent
.filter(e => e.`type` == "IssuesEvent")
// key by the name of the repository
.keyBy("repo.name")
// tumbling time window of a week
.timeWindow(Time.days(7))
// apply window function
.apply { (key, _, vals, out: Collector[(String)]) =>
var count = 0;
for (_ <- vals) {
count = count + 1;
}
out.collect(s"Repo name: $key Unique issues: $count")
}
要计算每个存储库的唯一问题数量,我们需要查看 IssuesEvents。我们按存储库的名称进行键控。然后,我们应用一个窗口函数来返回一个 String,指示问题的唯一数量。
引用:
在 Apache Flink 中引入 Stream Windows
眨眼窗口