我想通过spark创建Row number(row_num
)作为MySql中现有表的列,用于并行读取数据库(即,由于表中的所有列都是String,因此对列进行分区)。
当我尝试执行这个查询时:
val query = SELECT @row_number:=@row_number+1 as rowid,d.* FROM destination d, (SELECT @row_number:=0) as init
我得到了一个例外,如下所示:
17/10/16 10:50:00 INFO SparkSqlParser: Parsing command: SELECT @row_number:=@row_number+1 as rowid,d. FROM destination d, (SELECT @row_number:=0) as init
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
no viable alternative at input 'SELECT @'(line 1, pos 7)
== SQL ==
SELECT @row_number:=@row_number+1 as rowid,d. FROM destination d, (SELECT @row_number:=0) as init
-------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at com.syntel.spark.sparkDVT$.main(sparkDVT.scala:61)
at com.syntel.spark.sparkDVT.main(sparkDVT.scala)
我尝试过的代码:
val p2 = "@row_number"
val a = s"""SELECT $p2:=$p2+1 as rowid,d.* FROM destination d, (SELECT $p2:=0) as init"""
val df1 = spark.sql(a)
参考:
https://forums.databricks.com/questions/115/how-do-i-pass-parameters-to-my-sql-statements.html
如何在mysql 中执行以下来自spark的查询
val query = SELECT @row_number:=@row_number+1 as rowid,d.* FROM destination d, (SELECT @row_number:=0) as init
谢谢
我想通过spark 在MySql中创建行号(Row_num)作为现有表的列
row_number函数
使用行号:
row_number():Column窗口函数:返回窗口分区内从1开始的序列号。
您可以按如下方式使用它:
val input = spark.range(10)
scala> input.printSchema
root
|-- id: long (nullable = false)
import org.apache.spark.sql.expressions.Window
val byId = Window.orderBy($"id".asc)
scala> input.withColumn("index", row_number over byId).show
17/10/16 08:27:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+-----+
| id|index|
+---+-----+
| 0| 1|
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 6| 7|
| 7| 8|
| 8| 9|
| 9| 10|
+---+-----+
但是要小心,因为它是一个窗口函数,需要一个有序的窗口,并将属于一个窗口分区的所有行移动到一个Spark分区,如警告所示:
2016年10月17日08:27:01警告WindowExec:没有为Window操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能下降。
这意味着对于一个非常大的数据集,您可能会产生很长的GC,甚至由于OutOfMemoryError而根本无法完成。
单调递增id()函数
还有另一个函数单调地_incresing_id:
单调递增_id():列生成单调递增64位整数的列表达式。
请注意。。。
生成的ID保证单调递增且唯一,但不是连续的。当前的实现将分区ID放在高31位,将每个分区内的记录号放在低33位。假设数据帧有不到10亿个分区,每个分区有不到80亿条记录。
如果要执行mySQL
查询,则需要使用标准JDBC API。
Spark SQL API与DataFrame ou DataSet(Spark Object)相关。
主题How do I pass parameters to my SQL statements?
不讨论mySql
而是讨论my SQL