Async Spark RDD Computation for variance/std/mean



在我的代码中,我分别对三个不同的列执行了几次计算,以计算方差/标准/平均值等...... 问题是由于必须重新映射值然后计算每列的方差,因此运行的时间相当长。

是否可以同时异步运行所有这三个语句,并在下面示例中指定的 3 个变量中检索最终值?

final Double varSHOUR               = dataset.mapToDouble(new DoubleFunction<modelEhealth>() {
    @Override
    public double call(modelEhealth modelEhealth) throws Exception {
        return modelEhealth.getSHOUR();
    }
}).variance();
final Double varHOURLYFRAMESIN      = dataset.mapToDouble(new DoubleFunction<modelEhealth>() {
    @Override
    public double call(modelEhealth modelEhealth) throws Exception {
        return modelEhealth.getHOURLYFRAMESIN();
    }
}).variance();
final Double varHOURLYFRAMESOUT     = dataset.mapToDouble(new DoubleFunction<modelEhealth>() {
    @Override
    public double call(modelEhealth modelEhealth) throws Exception {
        return modelEhealth.getHOURLYFRAMESOUT();
    }
}).variance();
你必须

使用ModelHealth类而不是Double来模仿Spark的JavaDoubleRDD.variance()实现。这并不难,因为您可以使用 Spark 的StatCounter进行实际计算,您只需要其中的 3 个。

对于此示例,我将使用包含 3 个Double字段 v1、v2、v3 的简单ModelHealth

static class ModelHealth {
    final Double v1;
    final Double v2;
    final Double v3;
}

然后:

JavaRDD<ModelHealth> dataset = // your data
// zero value - three empty StatCounters:
final Tuple3<StatCounter, StatCounter, StatCounter> zeroValue = new Tuple3<>(new StatCounter(), new StatCounter(), new StatCounter());
// using `aggregate` to aggregate ModelHealth records into three StatCounters:
final Tuple3<StatCounter, StatCounter, StatCounter> stats = dataset.aggregate(zeroValue, new Function2<Tuple3<StatCounter, StatCounter, StatCounter>, ModelHealth, Tuple3<StatCounter, StatCounter, StatCounter>>() {
    @Override
    public Tuple3<StatCounter, StatCounter, StatCounter> call(Tuple3<StatCounter, StatCounter, StatCounter> stats, ModelHealth record) throws Exception {
        // merging record into tuple of StatCounters - each value  merged with corresponding counter
        stats._1().merge(record.v1);
        stats._2().merge(record.v2);
        stats._3().merge(record.v3);
        return stats;
    }
}, new Function2<Tuple3<StatCounter, StatCounter, StatCounter>, Tuple3<StatCounter, StatCounter, StatCounter>, Tuple3<StatCounter, StatCounter, StatCounter>>() {
    @Override
    public Tuple3<StatCounter, StatCounter, StatCounter> call(Tuple3<StatCounter, StatCounter, StatCounter> v1, Tuple3<StatCounter, StatCounter, StatCounter> v2) throws Exception {
        // merging tuples of StatCounters - each counter merged with corresponding one
        v1._1().merge(v2._1());
        v1._2().merge(v2._2());
        v1._3().merge(v2._3());
        return v1;
    }
});
Double v1_variance = stats._1().variance();
Double v2_variance = stats._2().variance();
Double v3_variance = stats._3().variance();

这给出了与你相同的结果,但对数据集进行了单个聚合。

相关内容

  • 没有找到相关文章

最新更新