我正试图在spark-sql中使用scala实现此查询
SELECT * FROM employees
WHERE emp_id IN (SELECT emp_id
FROM employees
WHERE SALARY > 200) ;
我已经将oracle表映射到spark数据帧
val employees = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:client/password@localhost:1521:orcldb1", "dbtable" -> "client.EMPLOYEE"));
子查询的输出遵循
scala> employees.where($"salary" > 100).select($"emp_id").collect().map{ row=>row.get(0)}
res3: Array[Any] = Array(6, 7, 8, 9, 10, 4, 2, 3, 5)
我在执行整个查询时出现以下错误
employees.where($"emp_id" in (employees.where($"salary" > 100).select($"emp_id").collect())).show
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
java.lang.RuntimeException: Unsupported literal type class [Ljava.lang.Object; [Ljava.lang.Object;@129df247
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:49)
at org.apache.spark.sql.functions$.lit(functions.scala:89)
at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:642)
at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:642)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
我得到了答案
val items = employees.where($"salary" > 100).select($"emp_id").collect().map{ row=>row.get(0)}
我唯一改变的是,我使用了isin而不是
employees.filter($"emp_id".isin(items:_*)).show