对于Spark RDD联盟来说,这是非常缓慢的



我有2个spark RDD,dataRDD和newPairDataRDD,用于Spark SQL查询。当我的应用程序初始化时,dataRDD 将被初始化。一个指定的 hbase 实体中的所有数据都将存储到 dataRDD 中。

当客户端的sql查询到来时,我的APP会得到所有新的更新和插入到newPairDataRDD。dataRDD 将 newPairDataRDD 联合起来,并在 Spark SQL 上下文中注册为表。

我在dataRDD中甚至找到了0条记录,在newPairDataRDD中找到了1条新插入的记录。联合需要 4 秒。太慢了

我认为这是不合理的。有人知道如何让它更快吗?谢谢简单的代码如下

    // Step1: load all data from hbase to dataRDD when initial, this only run once. 
    JavaPairRDD<String, Row>  dataRDD= getAllBaseDataToJavaRDD();
    dataRDD.cache();
    dataRDD.persist(StorageLevel.MEMORY_ONLY());
    logger.info(dataRDD.count());
    // Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD
    JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD();
    // Step3: if count>0 do union and reduce
       if(newPairDataRDD.count() > 0) {
        JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD);
    // if data was updated in DB, need to delete the old version from the dataRDD.
        dataRDD = unionedRDD.reduceByKey(
            new Function2<Row, Row, Row>() {
            // @Override
            public Row call(Row r1, Row r2) {
             return r2;
             }
            });
    }
//step4: register the dataRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema);
//step5: execute sql query
retRDD = sqlContext.sql(sql);
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

从火花网络 UI 中,我可以看到下面。显然它需要 4s 才能联合

已完成阶段 (8)

阶段 ID 说明 已提交 持续时间 任务:成功/总输入 随机 读取 随机写入

6 收集在 SparkPlan.scala:85+详情 2015/1/4 8:17 2 秒 8-8月 156.0 B

7 联合在 SparkSqlQueryForMars新.java:389+详细信息 2015/1/4 8:17 4 秒 8-8月 64.0 字节 156.0 字节

实现所需目标的更有效方法是使用cogroup()flatMapValues(),使用联合除了向dataRDD添加新分区外,几乎没有什么作用,这意味着所有数据必须在reduceByKey()之前洗牌。cogroup()flatMapValues()将导致仅对newPairDataRDD进行重新分区。

JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD);
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
    new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() {
        public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) {
            if (grouped._2.nonEmpty()) {
                return grouped._2;
            } else {
                return grouped._1;
            }
        }
    });

或在斯卡拉

val unioned = dataRDD.cogroup(newPairDataRDD)
val updated = unioned.flatMapValues { case (oldVals, newVals) =>
    if (newVals.nonEmpty) newVals else oldVals
}

免责声明,我不习惯用 Java 编写 spark!如果以上有误,请有人纠正我!

尝试对RDD进行重新分区:

JavaPairRDD unionedRDD =dataRDD.repartition(sc.defaultParallelism * 3).

union(newPairDataRDD.repartition(sc.defaultParallelism * 3));

相关内容

  • 没有找到相关文章

最新更新