简介
我必须编写分布式应用程序,该应用程序计算 3 条记录的最大唯一值数。我在这方面没有经验,根本不了解框架。我的输入可能如下所示:
u1: u2,u3,u4,u5,u6
u2: u1,u4,u6,u7,u8
u3: u1,u4,u5,u9
u4: u1,u2,u3,u6
...
然后结果的开头应该是:
(u1,u2,u3), u4,u5,u6,u7,u8,u9 => count=6
(u1,u2,u4), u3,u5,u6,u7,u8 => count=5
(u1,u3,u4), u2,u5,u6,u9 => count=4
(u2,u3,u4), u1,u5,u6,u7,u8,u9 => count=6
...
所以我的方法是首先合并每两条记录,然后将每个合并的对与每条记录合并。
问题
我可以在像 hadoop/spark 这样的帧中同时在多个输入行上工作(合并)这样的操作吗?或者也许我的方法不正确,我应该以不同的方式这样做?
任何建议将不胜感激。
我可以在像 hadoop/spark 这样的帧中同时在多个输入行上工作(合并)这样的操作吗?
是的,你可以。
或者也许我的方法不正确,我应该以不同的方式这样做?
这取决于数据的大小。如果数据较小,则在本地执行会更快、更轻松地完成。如果你的数据很大,至少几百GB,常见的策略是将数据保存到HDFS(分布式文件系统),并使用Mapreduce/Spark进行分析。
用 scala 编写的 Spark 应用程序示例:
object MyCounter {
val sparkConf = new SparkConf().setAppName("My Counter")
val sc = new SparkContext(sparkConf)
def main(args: Array[String]) {
val inputFile = sc.textFile("hdfs:///inputfile.txt")
val keys = inputFile.map(line => line.substring(0, 2)) // get "u1" from "u1: u2,u3,u4,u5,u6"
val triplets = keys.cartesian(keys).cartesian(keys)
.map(z => (z._1._1, z._1._2, z._2))
.filter(z => !z._1.equals(z._2) && !z._1.equals(z._3) && !z._2.equals(z._3)) // get "(u1,u2,u3)" triplets
// If you have small numbers of (u1,u2,u3) triplets, it's better prepare them locally.
val res = triplets.cartesian(inputFile).filter(z => {
z._2.startsWith(z._1._1) || z._2.startsWith(z._1._2) || z._2.startsWith(z._1._3)
}) // (u1,u2,u3) only matches line starts with u1,u2,u3, for example "u1: u2,u3,u4,u5,u6"
.reduceByKey((a, b) => a + b) // merge three lines
.map(z => {
val line = z._2
val values = line.split(",")
//count unique values using set
val set = new util.HashSet[String]()
for (value <- values) {
set.add(value)
}
"key=" + z._1 + ", count=" + set.size() // the result from one mapper is a string
}).collect()
for (line <- res) {
println(line)
}
}
}
- 代码未经过测试。而且效率不高。它可以进行一些优化(例如,删除不必要的map-reduce步骤)。
- 您可以使用 Python/Java 重写相同的版本。
- 你可以使用Hadoop/Mapreduce实现相同的逻辑。