Flink 流处理对每个存储库的唯一问题进行计数



我正在 Scala 中使用 Flink,我正在尝试获取每个存储库的独特问题计数。我有一个带有如下元组的数据流:(repo_name、issue_id、event_time(。如何获取每个repo_name的唯一issue_id计数?我想我必须使用mapWithState,但我不确定如何使用它。

提前谢谢。

假设您想在 7 天的翻转时间窗口中处理事件。

// eventStream: stream of case classes of type GithubEvent 
eventStream
   // only look at IssuesEvent
  .filter(e => e.`type` == "IssuesEvent")
   // key by the name of the repository
  .keyBy("repo.name")
   // tumbling time window of a week
  .timeWindow(Time.days(7))
   // apply window function
  .apply { (key, _, vals, out: Collector[(String)]) =>
    var count = 0;
    for (_ <- vals) {
      count = count + 1;
    }
    out.collect(s"Repo name: $key Unique issues: $count")
  }

要计算每个存储库的唯一问题数量,我们需要查看 IssuesEvents。我们按存储库的名称进行键控。然后,我们应用一个窗口函数来返回一个 String,指示问题的唯一数量。

引用:

  • 在 Apache Flink 中引入 Stream Windows

  • 眨眼窗口

相关内容

  • 没有找到相关文章

最新更新