Scala变量作用域和for循环



首先,我是一名C#开发人员,对Scala非常陌生。我们正在尝试使用Spark来查询SQL和Cassandra,这是一个概念验证程序。

 var output: StringBuilder = new StringBuilder();
  try {
    //var output: StringBuilder = new StringBuilder();
    var config = new Config(args);
    val sparkConf = new SparkConf(true)
      .set("spark.cassandra.connection.host", config.cassanrdaClusterIp).set("spark.akka.heartbeat.interval", "100")
    //var output: StringBuilder = new StringBuilder();
    val sparkContext: SparkContext = new SparkContext(config.sparkConnectionString, "springcm-spark-webserver", sparkConf)
    val sqlContext = new CassandraSQLContext(sparkContext)
    val sqlConnectionString = "jdbc:sqlserver://" + config.sqlConnectionString;
    if (args(0) == "DocHistoryReport") {
    val docHistoryReport = new DocHistoryReport(sparkContext, sqlContext, sqlConnectionString, config.cassanrdaKeyspace)
    // var output: StringBuilder = new StringBuilder();
    var result = docHistoryReport.Execute(config.accountId, config.userId, config.startDate, config.endDate, config.dateBucketType);
    result.collect();
    var file: File = new File("result.csv");
    //  var output: StringBuilder = new StringBuilder();
    if (!file.exists()) {
      file.createNewFile();
    }
    val pw = new PrintWriter(file);
    result.foreach(row => {
      output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
    })
    pw.write(output.toString());
    pw.flush();
    pw.close;
  }
  else {
    throw new IllegalArgumentException("Unsuported report type " + args(0));
  }
}

该代码创建一个spark上下文,运行一个简单的报告并将结果写入文件。请注意,变量输出在代码中被初始化了几次,但除一次外,其余都被注释掉了。如果在当前声明的位置以外的任何位置初始化输出,则result.csv文件将为空,并且在执行for循环擦除结果期间,输出变量将被重新初始化多次。

有人能解释一下发生了什么以及为什么变量初始化的位置很重要吗。谢谢

如果这实际上取决于初始化位置,我会非常惊讶:在任何情况下,预期的结果都是空文件。

result.foreach(row => {
  output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
})

与文档中的示例非常接近:

var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)

并且具有相同的问题:foreachrow => ...)的自变量被序列化并发送给每个工作者,并且在反序列化时,它会创建一个新的输出,而不是引用不同进程或计算机上的原始StringBuilder。阅读上面的链接部分可以帮助你理解更多。

可能的解决方案:

  1. 使用collect将RDD的所有元素获取到驱动程序,并对结果调用foreach(事实上,您的程序已经这样做了,但将结果丢弃)。

  2. foreach调用中放入写入文件的代码(显然,它必须是一个可以从工作机器访问的文件,例如使用HDFS)。

  3. 使用蓄能器。

  4. 使用mapmapPartition在每个工作线程上分别构建所需的字符串(获得RDD[String]),并将上述任一解决方案应用于结果。

相关内容

  • 没有找到相关文章

最新更新