。
我想读取多个文件,计算重复行,按重复数量进行排序,占据前10个重复行。
lines = env.readTextFile("logs-dir")
tuples = lines.map(line -> Tuple2(line, 1))
aggregate = tuples.groupBy(0).sum(1)
sort = aggregate.sortPartition(1, Order.DESCENDING)
sorted.first(10).writeAsText("domains")
问题是第一-n是任意的,并从所有分区返回随机10个第一元素。
有没有一种方法可以从所有分区中选择排序的第一个元素,而无需将并行性减少为1?
我将用并行的MapPartitionFunction
解决此问题,该问题将返回每个分区的前10个元素,将结果发送到一个分区,对其进行排序并再次采用前10个分区。看起来像这样:
lines = env.readTextFile("logs-dir")
tuples = lines.map(line -> Tuple2(line, 1))
aggregate = tuples.groupBy(0).sum(1)
// sort partitions in parallel
sortPart = aggregate.sortPartition(1, Order.DESCENDING)
// take first 10 of each partition
firstPart = sortPart.mapPartition(new First(10))
// sort all in one partition
sortFull = firstPart.sortPartition(1, Order.DESCENDING).parallelism(1)
// take first 10
first10 = sortFull.mapPartition(new First(10))
first10.writeAsText("domains")
MapPartitionFunction
First
非常简单。它只会计算出多少记录,然后从mapPartition()
功能返回0
。