我目前正在使用烫伤进行mapreduce作业。我正在尝试根据我在typedpipe中的行中看到特定值的次数来设置阈值。例如,如果我的typedpipe中有这些行:
专栏 1 |第 2 栏
"嗨" |"嘿">
"嗨" |"嗬">
"嗨" |"嗬">
"再见" |"再见">
我想在每一行附加我在每行的第 1 列和第 2 列中看到值的频率。这意味着输出如下所示:
专栏 1 |专栏 2 |列 1 频率 |列 2 频率
"嗨" |"嘿"|3 |1
"嗨" |"嗬" |3 |阿拉伯数字
"嗨" |"嗬" |3 |阿拉伯数字
"再见" |"再见" |1 |1
目前,我通过按每列对类型化的管道进行分组来做到这一点,如下所示:
val key2Freqs = input.groupBy('key2) {
_.size('key2Freq)
}.rename('key2 -> 'key2Right).project('key2Right, 'key2Freq);
然后使用 key2Freqs 连接原始输入,如下所示:
.joinWithSmaller('key2 -> 'key2Right, key2Freqs, joiner = new LeftJoin)
然而,这真的很慢,在我看来,对于本质上非常简单的任务来说效率很低。它变得特别长 b/c 我有 6 个不同的键想要获取这些值,我目前正在工作中映射和加入 6 个不同的时间。一定有更好的方法可以做到这一点,对吧?
如果每列中不同值的数量足够小,可以将它们全部放入内存中,则可以将列.map
到 Map[String,Int]
中,然后.groupAll.sum
一次性将它们全部计数(我使用的是"类型化 api"表示法,不太记得在字段中是如何完成的, 但你明白了(。你需要使用algebird的MapMonoid
,或者如果你不想为这件事添加依赖关系,就写你自己的,这并不难。然后,您最终会得到一个管道,其中包含生成的Map
的单个条目。现在,您可以获取原始管道,并执行.crossWithTiny
将包含计数的地图带入其中,然后.map
提取单个计数。
否则,如果你不能把所有这些都记在心里,那么你现在在做什么似乎是唯一的办法......除非您实际上正在寻找"顶级击球手"的近似值,而不是整个宇宙的精确计数......在这种情况下,请查看algebird的SketchMap。