我是Scala的新手。我想从每个Spark节点的Oracle数据库中读取数据并将其转换为Spark DataFrame。代码如下:
def read_data(group_id: Int):String = {
val table_name = "table"
val col_name = "col"
val query =
""" select f1,f2,f3,f4,f5,f6,f7,f8
| from """.stripMargin + table_name + """ where MOD(TO_NUMBER(substr("""+col_name+""", -LEAST(2, LENGTH("""+col_name+""")))),"""+num_node+""")="""+group_id
val oracleUser = "ORCL"
val oraclePassword = "*******"
val oracleURL = "jdbc:oracle:thin:@//x.x.x.x:1521/ORCLDB"
val ods = new OracleDataSource()
ods.setUser(oracleUser)
ods.setURL(oracleURL)
ods.setPassword(oraclePassword)
val con = ods.getConnection()
val statement = con.createStatement()
statement.setFetchSize(1000) // important
val resultSet : java.sql.ResultSet = statement.executeQuery(query)
var ret = " "
while(resultSet.next()) {
for {i <- 1 until 8 by 1
ret = ret.concat(resultSet.getString(i))
ret = ret.concat(" ")
}yield(ret)
return ret
}
println("ret:",ret)
return ret
}
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("testScala")
.set("spark.executor.memory", "8g")
.set("spark.executor.cores", "2")
.set("spark.task.cpus","1")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(group_list,num_node)
.map(read_data).map(x => println(x)).count()
println("rdd:",rdd)
我有问题的代码部分如下:
var ret = " "
while(resultSet.next()) {
for (i <- 1 until 8 by 1) {
ret = ret.concat(resultSet.getString(i))
ret = ret.concat(" ")
return ret
}
return ret
}
println("ret:",ret)
println("ret:",ret)
printnull
string。当我像这样修改代码时:
var ret = " "
while(resultSet.next()) {
for {i <- 1 until 8 by 1
ret = ret.concat(resultSet.getString(i))
ret = ret.concat(" ")
}yield(ret)
return ret
}
我收到这个错误:
ret is already defined as value ret
ret = ret.concat(" ")
事实上,在运行之前,我看到代码有concat
的问题:
Cannot resolve symbol concat
请指导我如何在他们之外获取while/for
的结果?
任何帮助都非常感谢。
你可以替换你的代码
var ret = " "
while(resultSet.next()) {
for {i <- 1 until 8 by 1
ret = ret.concat(resultSet.getString(i))
ret = ret.concat(" ")
}yield(ret)
return ret
}
val ret = Iterator.continually(resultSet)
.takeWhile(_.next)
.flatMap(r => (1 until 8).map(i => r.getString(i)))
.mkString(" ")
您在这里使用了for-comprehension。这里实际做的是创建一个名为ret
的新val。您所写的内容将被计算为
for(i <- 1 until 8 by 1){
val ret = ret.concat(resultSet.getString(i))
val ret = ret.concat(" ")
} yield(ret)
你可以使用
for {i <- 1 until 8 by 1
_ = ret = ret.concat(resultSet.getString(i))
_ = ret = ret.concat(" ")
} yield(ret)
您在这里使用了for-comprehension。这里实际做的是创建一个名为ret
的新val。您所写的内容将被计算为
for(i <- 1 until 8 by 1){
val ret = ret.concat(resultSet.getString(i))
val ret = ret.concat(" ")
} yield(ret)
你可以使用
for {i <- 1 until 8 by 1
_ = ret = ret.concat(resultSet.getString(i))
_ = ret = ret.concat(" ")
} yield(ret)
或者为了简化整个循环,将其替换为以下内容(我不确定您使用该代码的意图是什么,我假设您想要整个封闭字符串ret
,但是通过使用yield
,我假设您还需要此过程的中间步骤)
val ret = new StringBuilder(" ")
var steps: Seq[String] = Nil
while(resultSet.next()) {
steps = {
for (i <- 1 until 8 by 1) {
ret = ret.append(resultSet.getString(i)).append(" ")
} yield(ret.toString)
}
}