拆分字符串两次并在 Scala 中减少 ByKey



我有一个.csv文件,我正在尝试使用Spark进行分析。.csv文件包含主题及其计数的列表。主题及其计数由","分隔,所有这些主题+计数都位于由";"分隔的同一字符串中,如下所示

"topic_1,10;topic_2,12;topic_1,3"

如您所见,某些主题多次出现在字符串中。

我有一个包含某个日期的键值对和主题字符串 [日期,主题字符串] 的 rdd

我想做的是在";"处拆分字符串以获取所有单独的主题,然后在","处拆分这些主题,并创建主题名称和计数的键值对,可以通过键减少。对于上面的例子,这将是

[date, ((topic_1, 13), (topic_2, 12))]

所以我经常玩火花,因为我是 scala 的新手。我试图做的是

val separateTopicsByDate = topicsByDate
.mapValues(_.split(";").map({case(str) => (str)}))
.mapValues({case(arr) => arr
.filter(str => str.split(",").length > 1)
.map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
})

问题是这会返回一个元组数组,我无法减少 ByKey。当我在";"处拆分字符串时,这将返回一个数组。我尝试将其映射到元组(如您从映射操作中看到的那样(,但这不起作用。

我使用的完整代码是

val rdd = sc.textFile("./data/segment/*.csv")
val topicsByDate = rdd
.filter(line => line.split("t").length > 23)
.map({case(str) => (str.split("t")(1), str.split("t")(23))})
.reduceByKey(_ + _)
val separateTopicsByDate = topicsByDate
.mapValues(_.split(";").map({case(str) => (str)}))
.mapValues({case(arr) => arr
.filter(str => str.split(",").length > 1)
.map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
})
separateTopicsByDate.take(2)

这返回

res42: Array[(String, Array[(String, Int)])] = Array((20150219001500,Array((Cecilia Pedraza,91), (Mexico City,110), (Soviet Union,1019), (Dutch Warmbloods,1236), (Jose Luis Vaquero,1413), (National Equestrian Club,1636), (Lenin Park,1776), (Royal Dutch Sport Horse,2075), (North American,2104), (Western Hemisphere,2246), (Maydet Vega,2800), (Mirach Capital Group,110), (Subrata Roy,403), (New York,820), (Saransh Sharma,945), (Federal Bureau,1440), (San Francisco,1482), (Gregory Wuthrich,1635), (San Francisco,1652), (Dan Levine,2309), (Emily Flitter,2327), (K...

如您所见,这是一个元组数组,我不能在上面使用 .reduceByKey(_ + _(。

有没有办法以可以通过键减少的方式拆分字符串?

如果您的RDD具有以下行:

[date, "topic1,10;topic2,12;topic1,3"]  

您可以使用flatMap拆分值并将行分解为:

[date, ["topic1,10", "topic2,12", "topic1,3"]] ->
[date, "topic1,10"]  
[date, "topic2,12"]  
[date, "topic1,3"]

然后将每一行转换为[字符串,整数]元组(代码中的rdd1(:

["date_topic1",10]  
["date_topic2",12]  
["date_topic1",3]

并使用加法(代码中rdd2(按键减少(:

["date_topic1",13]  
["date_topic2",12]  

然后,您将日期与主题分开,并将主题与值组合在一起,得到[字符串,字符串]元组,如下所示:

["date", "topic1,13"]  
["date", "topic2,12"]  

最后,您将值拆分为 [topic,count] 元组,准备["date", [(topic,count)]]对(代码中rdd3(并通过键减少(代码中rdd4(,得到:

["date", [(topic1, 13), (topic2, 12)]]

===下面是
作为四个中间RDD序列的Java实现,您可以参考它进行Scala开发:

JavaPairRDD<String, String> rdd;     //original data. contains [date, "topic1,10;topic2,12;topic1,3"] 
JavaPairRDD<String, Integer> rdd1 =  //contains
//["date_topic1",10]  
//["date_topic2",12]  
//["date_topic1",3]

rdd.flatMapToPair(
pair -> //pair=[date, "topic1,10;topic2,12;topic1,3"]
{
List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
String k = pair._1; //date
String v = pair._2; //"topic,count;topic,count;topic,count"
String[] v_splits = v.split(";");
for(int i=0; i<v_splits.length; i++)
{
String[] v_split_topic_count = v_splits[i].split(",");  //"topic,count"
list.add(new Tuple2<String,Integer>(k + "_" + v_split_topic_count[0], Integer.parseInt(v_split_topic_count[1]))); //"date_topic,count"
}
return list.iterator();
}//end call
);

JavaPairRDD<String,Integer> rdd2 = //contains
//["date_topic1",13]  
//["date_topic2",12]  

rdd1.reduceByKey((Integer i1, Integer i2) -> i1+i2);     

JavaPairRDD<String,Iterator<Tuple2<String,Integer>>> rdd3 = //contains
//["date", [(topic1,13)]]  
//["date", [(topic2,12)]]  
rdd2.mapToPair(
pair -> //["date_topic1",13]
{
String k  = pair._1; //date_topic1
Integer v = pair._2; //13

String[] dateTopicSplits = k.split("_");
String new_k = dateTopicSplits[0]; //date                    
List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
list.add(new Tuple2<String,Integer>(dateTopicSplits[1], v)); //[(topic1,13)]
return new Tuple2<String,Iterator<Tuple2<String,Integer>>>(new_k, list.iterator());
}
);
JavaPairRDD<String,Iterator<Tuple2<String,Integer>>> rdd4 = //contains
//["date", [(topic1, 13), (topic2, 12)]]
rdd3.reduceByKey(
(Iterator<Tuple2<String,Integer>> itr1, Iterator<Tuple2<String,Integer>> itr2) ->
{
List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
while(itr1.hasNext())
list.add(itr1.next());
while(itr2.hasNext())
list.add(itr2.next());
return list.iterator();
}
);

上。这个问题实际上可以通过仅使用单个map来解决 - 您将行值(即主题字符串(拆分为;,因此它将[键,值]对作为[主题,计数],并且您通过这些对将计数相加来填充哈希图。最后,输出date键,其中包含哈希图中累积的所有不同键及其值。
这种方式似乎也更有效,因为哈希映射的大小不会大于原始行的大小,因此映射器消耗的内存空间将受到最大行大小的限制,而在平面映射解决方案中,内存应该足够大以容纳所有这些扩展行

最新更新