在我的代码中,我分别对三个不同的列执行了几次计算,以计算方差/标准/平均值等...... 问题是由于必须重新映射值然后计算每列的方差,因此运行的时间相当长。
是否可以同时异步运行所有这三个语句,并在下面示例中指定的 3 个变量中检索最终值?
final Double varSHOUR = dataset.mapToDouble(new DoubleFunction<modelEhealth>() {
@Override
public double call(modelEhealth modelEhealth) throws Exception {
return modelEhealth.getSHOUR();
}
}).variance();
final Double varHOURLYFRAMESIN = dataset.mapToDouble(new DoubleFunction<modelEhealth>() {
@Override
public double call(modelEhealth modelEhealth) throws Exception {
return modelEhealth.getHOURLYFRAMESIN();
}
}).variance();
final Double varHOURLYFRAMESOUT = dataset.mapToDouble(new DoubleFunction<modelEhealth>() {
@Override
public double call(modelEhealth modelEhealth) throws Exception {
return modelEhealth.getHOURLYFRAMESOUT();
}
}).variance();
你必须
使用ModelHealth
类而不是Double来模仿Spark的JavaDoubleRDD.variance()
实现。这并不难,因为您可以使用 Spark 的StatCounter
进行实际计算,您只需要其中的 3 个。
对于此示例,我将使用包含 3 个Double
字段 v1、v2、v3 的简单ModelHealth
:
static class ModelHealth {
final Double v1;
final Double v2;
final Double v3;
}
然后:
JavaRDD<ModelHealth> dataset = // your data
// zero value - three empty StatCounters:
final Tuple3<StatCounter, StatCounter, StatCounter> zeroValue = new Tuple3<>(new StatCounter(), new StatCounter(), new StatCounter());
// using `aggregate` to aggregate ModelHealth records into three StatCounters:
final Tuple3<StatCounter, StatCounter, StatCounter> stats = dataset.aggregate(zeroValue, new Function2<Tuple3<StatCounter, StatCounter, StatCounter>, ModelHealth, Tuple3<StatCounter, StatCounter, StatCounter>>() {
@Override
public Tuple3<StatCounter, StatCounter, StatCounter> call(Tuple3<StatCounter, StatCounter, StatCounter> stats, ModelHealth record) throws Exception {
// merging record into tuple of StatCounters - each value merged with corresponding counter
stats._1().merge(record.v1);
stats._2().merge(record.v2);
stats._3().merge(record.v3);
return stats;
}
}, new Function2<Tuple3<StatCounter, StatCounter, StatCounter>, Tuple3<StatCounter, StatCounter, StatCounter>, Tuple3<StatCounter, StatCounter, StatCounter>>() {
@Override
public Tuple3<StatCounter, StatCounter, StatCounter> call(Tuple3<StatCounter, StatCounter, StatCounter> v1, Tuple3<StatCounter, StatCounter, StatCounter> v2) throws Exception {
// merging tuples of StatCounters - each counter merged with corresponding one
v1._1().merge(v2._1());
v1._2().merge(v2._2());
v1._3().merge(v2._3());
return v1;
}
});
Double v1_variance = stats._1().variance();
Double v2_variance = stats._2().variance();
Double v3_variance = stats._3().variance();
这给出了与你相同的结果,但对数据集进行了单个聚合。