我是Spark和Scala的新手。我正在尝试从SQL服务器中的过程中获取内容以在Spark SQL中使用它。为此,我通过 Scala 中的 JDBCRDD 导入数据 (Eclipse) 并从该过程制作 RDD。
创建RDD后,我将其注册为临时表,然后使用sqlContext.sql("选择查询以选择特定列")。但是当我在选择查询中输入列名时,它会抛出错误,因为我在 RDD 和临时表中都没有列名。
请在下面找到我的代码:
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val url = XXXX
val username = XXXX
val password = XXXX
val query = "select A, B, C, D from Random_Procedure where ID_1 = ? and ID_2 = ?"
// New SparkContext
val sc = new SparkConf().setMaster("local").setAppName("Amit")
val sparkContext = new SparkContext(sc)
val rddData = new JdbcRDD(sparkContext, () =>
DriverManager.getConnection(url, username, password),
query, 1, 0, 1, (x: ResultSet) => x.getString("A") + ", " +
x.getString("B") + ", " + x.getString("C") + ", " +
x.getString("D")).cache()
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
val dataFrame = rddData.toDF
dataFrame.registerTempTable("Data")
sqlContext.sql("select A from Data").collect.foreach(println)
当我运行此代码时,它会抛出一个错误:无法解析给定输入列 _1 的"代码";
但是当我运行时: sqlContext.sql("select * from Data").collect.foreach(println)它打印所有列A,B,C,D
我相信我没有在我创建的 JdbcRDD 中获取列名,因此它们在临时表中无法访问。我需要帮助。
问题是你创建了JdbcRDD对象,你需要DataFrame。RDD simple 不包含有关从元组到列名的映射的信息。因此,您应该从jdbc源创建数据帧,如编程指南中所述 此外:
Spark SQL还包括一个数据源,可以从其他数据源读取数据 使用 JDBC 的数据库。此功能应优先于 使用 JdbcRDD
另请注意,数据帧是在 Spark 1.3.0 中添加的。如果您使用旧版本,则必须使用org.apache.spark.sql.SchemaRDD