如何提高 Spark 性能?



我有处理大型数据集的Java程序。数据集存储在 hdfs (csv( 中。

该程序运行良好,但速度非常慢。

程序的作用:

  1. 加载 CSV 文件
  2. 将单独的行转换为字符串[]
  3. 过滤器字符串数组
  4. 映射到我的对象
  5. 将我的对象保存到卡桑德拉

有我的主要方法:

public static void main(String[] args) {
// configure spark
SparkConf sparkConf = new SparkConf().setAppName("Write to cassandra app")
.setMaster("local[*]")
.set("spark.executor.memory", "4g");
if (args.length > 1)
sparkConf.set("spark.cassandra.connection.host", args[1]);
// start a spark context
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// read text file to RDD
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<MyObject> myObjectJavaRDD = lines
.map(line -> line.split(","))
.filter(someFilter)
.map(MyObject::new);
javaFunctions(myObjectJavaRDD).writerBuilder("ks", "table", mapToRow(MyObject.class)).saveToCassandra();
}

如何提高性能?

谢谢你的回答。

你的代码没有随机问题(除非你必须写出到HDFS(,默认分区是由输入格式定义的,在Hadoop上,HDFS核心和过滤器或映射不会改变分区。如果你可以先过滤,你可以看到一些改进

JavaRDD<MyObject> myObjectJavaRDD = lines
.filter(someFilter)
.map(line -> line.split(","))
.map(MyObject::new);

Spark 对于 RDD 的每个分区只能运行 1 个并发任务,最多 群集中的核心。因此,如果您有一个包含 50 个内核的集群,您希望您的 RDD 至少 有 50 个分区。就选择"好"数量的分区而言,您通常至少需要与 并行性的执行程序数。可以通过调用来获取此计算值

sc.defaultParallelism

或通过以下方式检查RDD分区编号

someRDD.partitions.size

通过使用读取文件来创建 RDD 时

rdd = SparkContext().textFile("hdfs://…/file.txt") 

分区数可能更小。理想情况下,你会得到相同的 您在 HDFS 中看到的块数,但如果文件中的行太长(长于 块大小(,分区会更少。

设置 RDD 分区数的首选方法是直接将其作为 调用中的第二个输入参数,如

rdd = sc.textFile("hdfs://… /file.txt", 400) 

其中 400 是分区数。在这种情况下,分区进行 400 次拆分,这将 由Hadoop的TextInputFormat完成,而不是Spark,它会更快地工作。它 此外,代码生成 400 个并发任务以尝试将文件.txt直接加载到 400 中 分区。

重新分区:增加分区,在过滤器增加平行后重新平衡分区

repartition(numPartitions: Int)

合并:在输出到 HDFS/外部之前减少分区,无需随机合并

coalesce(numPartitions: Int, suffle: Boolean = false)

最后,同样重要的是,你可以用不同的值和基准做一些试验,看看这个过程花了多少时间。

val start = System.nanoTime()
// my process
val end = System.nanoTime()
val time = end - start
println(s"My App takes: $time")

我希望,它有帮助

最新更新