i有2个输入,其中第一个输入是流(例如input1),第二个输入是批处理(例如input2)。我想弄清楚第一个输入中的键是单行匹配的还是第二个输入中的一个以上。进一步的转换/逻辑取决于行匹配的数量,是单行匹配还是多个行匹配(对于第一个输入中的至少一个键)
if(single row matches){
// do something
}else{
// do something
}
我尝试过的代码
val input1Pair = streamData.map(x => (x._1, x))
val input2Pair = input2.map(x => (x._1, x))
val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
val result = joinData.mapValues{
case(v, Some(a)) => 1L
case(v, None) => 0
}.reduceByKey(_ + _).filter(_._2 > 1)
我已经完成了上述编码。当我确实结果时。print,如果所有键在Input2中仅匹配一行,它就不会打印。由于Dstream可能具有多个RDD,因此不确定如何弄清楚Dstream是否为空。如果可能的话,我可以进行检查。
没有任何功能可以确定dstream是否为空,因为dstream代表一个集合随着时间的推移。从概念的角度来看,一个空的Dstream将是一个从未有数据的流,并且不是很有用。
可以做的是检查给定的Microbatch是否具有数据:
dstream.foreachRDD{ rdd => if (rdd.isEmpty) {...} }
请注意,在任何给定的时间点,只有一个RDD。
我认为实际问题是如何检查参考RDD和Dstream中数据之间的匹配数。可能是最简单的方法是与两个集合相交并检查相交大小:
val intersectionDStream = streamData.transform{rdd => rdd.intersection(input2)}
intersectionDStream.foreachRDD{rdd =>
if (rdd.count > 1) {
..do stuff with the matches
} else {
..do otherwise
}
}
我们还可以将以RDD为中心的转换放在foreachRDD
操作中:
streamData.foreachRDD{rdd =>
val matches = rdd.intersection(input2)
if (matches.count > 1) {
..do stuff with the matches
} else {
..do otherwise
}
}