首先,我是一名C#开发人员,对Scala非常陌生。我们正在尝试使用Spark来查询SQL和Cassandra,这是一个概念验证程序。
var output: StringBuilder = new StringBuilder();
try {
//var output: StringBuilder = new StringBuilder();
var config = new Config(args);
val sparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", config.cassanrdaClusterIp).set("spark.akka.heartbeat.interval", "100")
//var output: StringBuilder = new StringBuilder();
val sparkContext: SparkContext = new SparkContext(config.sparkConnectionString, "springcm-spark-webserver", sparkConf)
val sqlContext = new CassandraSQLContext(sparkContext)
val sqlConnectionString = "jdbc:sqlserver://" + config.sqlConnectionString;
if (args(0) == "DocHistoryReport") {
val docHistoryReport = new DocHistoryReport(sparkContext, sqlContext, sqlConnectionString, config.cassanrdaKeyspace)
// var output: StringBuilder = new StringBuilder();
var result = docHistoryReport.Execute(config.accountId, config.userId, config.startDate, config.endDate, config.dateBucketType);
result.collect();
var file: File = new File("result.csv");
// var output: StringBuilder = new StringBuilder();
if (!file.exists()) {
file.createNewFile();
}
val pw = new PrintWriter(file);
result.foreach(row => {
output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
})
pw.write(output.toString());
pw.flush();
pw.close;
}
else {
throw new IllegalArgumentException("Unsuported report type " + args(0));
}
}
该代码创建一个spark上下文,运行一个简单的报告并将结果写入文件。请注意,变量输出在代码中被初始化了几次,但除一次外,其余都被注释掉了。如果在当前声明的位置以外的任何位置初始化输出,则result.csv文件将为空,并且在执行for循环擦除结果期间,输出变量将被重新初始化多次。
有人能解释一下发生了什么以及为什么变量初始化的位置很重要吗。谢谢
如果这实际上取决于初始化位置,我会非常惊讶:在任何情况下,预期的结果都是空文件。
result.foreach(row => {
output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
})
与文档中的示例非常接近:
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
并且具有相同的问题:foreach
(row => ...
)的自变量被序列化并发送给每个工作者,并且在反序列化时,它会创建一个新的输出,而不是引用不同进程或计算机上的原始StringBuilder
。阅读上面的链接部分可以帮助你理解更多。
可能的解决方案:
使用
collect
将RDD的所有元素获取到驱动程序,并对结果调用foreach
(事实上,您的程序已经这样做了,但将结果丢弃)。在
foreach
调用中放入写入文件的代码(显然,它必须是一个可以从工作机器访问的文件,例如使用HDFS)。使用蓄能器。
使用
map
或mapPartition
在每个工作线程上分别构建所需的字符串(获得RDD[String]
),并将上述任一解决方案应用于结果。