//create RDD
val rdd = sc.makeRDD(List(("a", (1, "m")), ("b", (1, "m")),
("a", (1, "n")), ("b", (2, "n")), ("c", (1, "m")),
("c", (5, "m")), ("d", (1, "m")), ("d", (1, "n"))))
val groupRDD = rdd.groupByKey()
在groupByKey
之后,我想过滤第二个元素不等于1,得到
("b", (1, "m")),("b", (2, "n")), ("c", (1, "m")), ("c", (5, "m"))`
groupByKey()
是必须的,可以帮助我,非常感谢。
编辑:
,但如果第二个元素类型是字符串,则过滤第二个元素,它们都等于x
,如
("a",("x","m")), ("a",("x","n")), ("b",("x","m")), ("b",("y","n")), ("c",("x","m")), ("c",("z","m")), ("d",("x","m")), ("d",("x","n"))
并得到相同的结果
("b",("x","m")), ("b",("y","n")), ("c",("x","m")), ("c",("z","m"))
你可以这样做:
val groupRDD = rdd
.groupByKey()
.filter(value => value._2.map(tuple => tuple._1).sum != value._2.size)
.flatMapValues(list => list) // to get the result as you like, because right now, they are, e.g. (b, Seq((1, m), (1, n)))
这所做的是,我们首先通过groupByKey
对键进行分组,然后我们通过filter
对分组条目中的键进行过滤,并检查sum
是否与分组条目大小相同。例如:
(a, Seq((1, m), (1, n)) -> grouped by key
(a, Seq((1, m), (1, n), 2 (the sum of 1 + 1), 2 (size of sequence))
2 = 2, filter this row out
最终结果:
(c,(1,m))
(b,(1,m))
(c,(5,m))
(b,(2,n))
祝你好运!
编辑
假设key
from tuple可以是任意字符串;假设rdd
是您的数据,包含:
(a,(x,m))
(c,(x,m))
(c,(z,m))
(d,(x,m))
(b,(x,m))
(a,(x,n))
(d,(x,n))
(b,(y,n))
则可以构造uniqueCount
为:
val uniqueCount = rdd
// we swap places, we want to check for combination of (a, 1), (b, a), (b, b), (c, a), etc.
.map(entry => ((entry._1, entry._2._1), entry._2._2))
// we count keys, meaning that (a, 1) gives us 2, (b, a) gives us 1, (b, b) gives us 1, etc.
.countByKey()
// we filter out > 2, because they are duplicates
.filter(a => a._2 == 1)
// we get the very keys, so we can filter below
.map(a => a._1._1)
.toList
然后:
val filteredRDD = rdd.filter(a => uniqueCount.contains(a._1))
给出如下输出:
(b,(y,n))
(c,(x,m))
(c,(z,m))
(b,(x,m))