如何计算滚烫中类型管道中行中列的频率



我目前正在使用烫伤进行mapreduce作业。我正在尝试根据我在typedpipe中的行中看到特定值的次数来设置阈值。例如,如果我的typedpipe中有这些行:

专栏 1 |第 2 栏

"嗨" |"嘿">

"嗨" |"嗬">

"嗨" |"嗬">

"再见" |"再见">

我想在每一行附加我在每行的第 1 列和第 2 列中看到值的频率。这意味着输出如下所示:

专栏 1 |专栏 2 |列 1 频率 |列 2 频率

"嗨" |"嘿"|3 |1

"嗨" |"嗬" |3 |阿拉伯数字

"嗨" |"嗬" |3 |阿拉伯数字

"再见" |"再见" |1 |1

目前,我通过按每列对类型化的管道进行分组来做到这一点,如下所示:

  val key2Freqs = input.groupBy('key2) {
    _.size('key2Freq)
  }.rename('key2 -> 'key2Right).project('key2Right, 'key2Freq);

然后使用 key2Freqs 连接原始输入,如下所示:

  .joinWithSmaller('key2 -> 'key2Right, key2Freqs, joiner = new LeftJoin)

然而,这真的很慢,在我看来,对于本质上非常简单的任务来说效率很低。它变得特别长 b/c 我有 6 个不同的键想要获取这些值,我目前正在工作中映射和加入 6 个不同的时间。一定有更好的方法可以做到这一点,对吧?

如果每列中不同值的数量足够小,可以将它们全部放入内存中,则可以将列.mapMap[String,Int] 中,然后.groupAll.sum一次性将它们全部计数(我使用的是"类型化 api"表示法,不太记得在字段中是如何完成的, 但你明白了(。你需要使用algebird的MapMonoid,或者如果你不想为这件事添加依赖关系,就写你自己的,这并不难。然后,您最终会得到一个管道,其中包含生成的Map的单个条目。现在,您可以获取原始管道,并执行.crossWithTiny将包含计数的地图带入其中,然后.map提取单个计数。

否则,如果你不能把所有这些都记在心里,那么你现在在做什么似乎是唯一的办法......除非您实际上正在寻找"顶级击球手"的近似值,而不是整个宇宙的精确计数......在这种情况下,请查看algebird的SketchMap。

相关内容

  • 没有找到相关文章

最新更新