我有一个带有钥匙值的vaceoid-userid的dstream,什么是count vide vact的好练习?
// VideoID,UserID
foo,1
foo,2
bar,1
bar,2
foo,1
bar,2
如上所述,我想随时删除冗余foo,1
和bar,2
,以获取vaceoid-countuserid,因此结果应为:
foo: 2
bar: 2
换句话说,我想在内存中保存一个大状态数据集。当新的Dstream到达时,将其与数据集进行比较以计算每个视频的不同用户。
如何做?
我正在使用Spark 1.6,但是接受了Onward版本的答案。如果可能的话,Python代码。
为了获得视频ID分组的不同用户ID计数,请考虑使用 gentregateByKey 。抱歉,这是Scala,所以您必须翻译。
val rdd = sc.textFile("your_file.txt")
val initialSet = Set.empty[Int]
val addToSet = (s: Set[Int], v:Int) => s + v
val mergeSets = (s1: Set[Int], s2: Set[Int]) => s1 ++ s2
val distinctValSets = rdd.aggregateByKey(initialSet)(addToSet, mergeSets)
val distinctValCountd = rdd.map({case(k,s) => (k,s.size)})
初始集合是您的聚合对象,addtoset和Mergesets的初始值,指定如何在集合中添加值并根据键合并不同的集合。这应该为您提供与每个视频相关的用户的数量,并且比ReedbyKey和GroupByKey更便宜(空间)。
val rdd1 = sc.parallelize(Seq(("foo", 1),("foo", 2),("foo", 1)))
rdd1.groupByKey.mapValues(x=>x.toSet.toSeq).flatMapValues(x=>x).collect