如何将Spark RDD转换为Spark DataFrame



我与Spark 3.1.2Scala 2.12一起工作。我想在节点间并行化一些键它们根据接收到的键来读取数据。因此,我必须先处理RDD,然后将其转换为Spark DataFrame。我从Oracle Database中的表中读取数据。代码如下:

object managementData extends App {
val num_node = 2
def read_data(group_id: Int):String =  {
val table_name = "table"
val col_name = "col"
val query =
""" select f1,f2,f3,f4,f5,f6,f7,f8
| from """.stripMargin + table_name + """ where MOD(TO_NUMBER(substr("""+col_name+""", -LEAST(2, LENGTH("""+col_name+""")))),"""+num_node+""")="""+group_id
val oracleUser = "ORCL"
val oraclePassword = "XXXXXXXX"
val oracleURL = "jdbc:oracle:thin:@//X.X.X.X:1521/ORCLDB"
val ods = new OracleDataSource()
ods.setUser(oracleUser)
ods.setURL(oracleURL)
ods.setPassword(oraclePassword)
val con = ods.getConnection()
val statement = con.createStatement()
statement.setFetchSize(1000)      // important
val  resultSet : java.sql.ResultSet = statement.executeQuery(query) //
val ret = Iterator.continually(resultSet)
.takeWhile(_.next)
.flatMap(r => (1 until 8).map(i => r.getString(i)))
.mkString(" ")
return ret
}
def udf(rdd: RDD[String]): DataFrame = {
val spark = SparkSession
.builder.getOrCreate()
val schema = new StructType()
.add(StructField("f1", StringType, true))
.add(StructField("f2", StringType, true))
.add(StructField("f3", StringType, true))
.add(StructField("f4", StringType, true))
.add(StructField("f5", StringType, true))
.add(StructField("f6", StringType, true))
.add(StructField("f7", IntegerType, true))
.add(StructField("f8", IntegerType, true))
val df = spark.createDataFrame(rdd, schema)
return df
}
def main():Unit={
val group_list = Seq.range(1,2,1) // making a list
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("testScala")
.set("spark.executor.memory", "8g")
.set("spark.executor.cores", "2")
.set("spark.task.cpus","1")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(group_list,num_node)
.map(read_data)
rdd.map(x => println(x)).collect()
udf(rdd)
}
main()
}

read_data工作完美。但是,我无法将RDD转换为Spark DataFrame并收到此错误:

overloaded method value createDataFrame with alternatives:
(data: java.util.List[_],beanClass: 
Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: 
Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.rdd.RDD[_],beanClass: 
Class[_])org.apache.spark.sql.DataFrame <and>
(rows: java.util.List[org.apache.spark.sql.Row],schema:  org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[String], org.apache.spark.sql.types.StructType)
val df = spark.createDataFrame(rdd, schema)

请您指导我createDataFrame方法将RDD转换为Spark Dataframe有什么问题?

任何帮助都非常感谢。

需要这种方式:

...
val schema = new StructType()
.add(StructField("f", StringType, true))
.add(StructField("m", StringType, true))
.add(StructField("l", StringType, true))
.add(StructField("d", StringType, true))
.add(StructField("g", StringType, true))
.add(StructField("v", IntegerType, true))
val df = spark.createDataFrame(rdd, schema)

最新更新