我需要在一个巨大的RDD上进行复杂的计算,但是为了简单起见,我将问题减少到了更简单的事情:
我有这样的javardd:
JavaRDD<Student> students = sc.parallelize(
javaFunctions(sc).cassandraTable("test", "school", mapRowTo(Student.class)).collect());
学生班看起来像:
public class Student implements Serializable{
Integer id;
Integer classroom;
String name;
Integer mark1;
Integer mark2;
// ... getters and setters
现在,我想在一个迭代中在每个教室中使用stddedv,avg for mark1和mark2列,如果可能是statcounter。我知道如何使用StatCounter,但是在
上JavaRDD<Numeric>
,就我而言,我有
JavaRDD<Student>
有什么想法?
谢谢
首先,从不:
sc.parallelize(someRDD.collect());
这不是一个好主意。像以往。
现在:
如果可能是Statcounter,则使用stddedv,Mark1和Mark2列的STDDEDV,AVG和Mark2列
是可能的,但是只需使用CASANDRA连接器使用DataFrame
:
import static org.apache.spark.sql.functions.*;
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "school", "keyspace" -> "test"))
.load()
.groupBy("classroom"))
.agg(mean("mark1"), stddev("mark1"), mean("mark2"), stddev("mark2"));
使用Stat计数器,您可以使用StatCounters
的Tuple2
转换JavaPairRDD<Integer,Tuple2<Integer,Integer>>
((class, (mark1, , mark2))
)和combineByKey
。您也可以用mllib.Vector
替换Tuple2
,并用MultivariateStatisticalSummary