我有一个分组的JavaPairRDD<String,Iterable<String>>
,看起来像这样
(null,[null])
(01,[POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, AMAN, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdatrviceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkviceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:, POGUpdateTenentKafkaListenerServiceImpl:])
(10,[POGUpdateTenestenerServiceImpl:])
(23,[POGUpdateTenentKafkaListenerServiceImpl:])
现在对于每个数字,我想计算一个值出现的次数。例如,在本例中,我希望输出为
01,POGUpdateTenestenerServiceImpl=23: AMAN=1
表示键1。
scala实现
input.map(r=>
(r._1, r._2.groupBy(identity).mapValues(_.size).toList)
)
JavaPairRDD<String, Iterable<String>> pairRDD = ...;
JavaPairRDD<String, Map<String, Integer>> resultPairRDD = pairRDD.mapValues(new Function<Iterable<String>, Map<String, Integer>>() {
@Override
public Map<String, Integer> call(Iterable<String> arg0) throws Exception {
Map<String, Integer> countMap = new HashMap<String, Integer>();
for(String s:arg0){
int curCnt = countMap.containsKey(s) ? countMap.get(s) : 0;
countMap.put(s, (curCnt+1));
}
return countMap;
}
});