如何将行号生成为现有表的列



我想通过spark创建Row number(row_num)作为MySql中现有表的列,用于并行读取数据库(即,由于表中的所有列都是String,因此对列进行分区)。

当我尝试执行这个查询时:

val query = SELECT @row_number:=@row_number+1 as rowid,d.* FROM destination d, (SELECT @row_number:=0) as init

我得到了一个例外,如下所示:

17/10/16 10:50:00 INFO SparkSqlParser: Parsing command: SELECT @row_number:=@row_number+1 as rowid,d. FROM destination d, (SELECT @row_number:=0) as init
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
no viable alternative at input 'SELECT @'(line 1, pos 7)
== SQL ==
SELECT @row_number:=@row_number+1 as rowid,d. FROM destination d, (SELECT @row_number:=0) as init
-------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at com.syntel.spark.sparkDVT$.main(sparkDVT.scala:61)
at com.syntel.spark.sparkDVT.main(sparkDVT.scala)

我尝试过的代码:

val p2 = "@row_number"
val a = s"""SELECT $p2:=$p2+1 as rowid,d.* FROM destination d, (SELECT $p2:=0) as init"""
val df1 = spark.sql(a)

参考:

https://forums.databricks.com/questions/115/how-do-i-pass-parameters-to-my-sql-statements.html

如何在mysql 中执行以下来自spark的查询

val query = SELECT @row_number:=@row_number+1 as rowid,d.* FROM destination d, (SELECT @row_number:=0) as init

谢谢

我想通过spark 在MySql中创建行号(Row_num)作为现有表的列

row_number函数

使用行号:

row_number():Column窗口函数:返回窗口分区内从1开始的序列号。

您可以按如下方式使用它:

val input = spark.range(10)
scala> input.printSchema
root
|-- id: long (nullable = false)
import org.apache.spark.sql.expressions.Window
val byId = Window.orderBy($"id".asc)
scala> input.withColumn("index", row_number over byId).show
17/10/16 08:27:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+-----+
| id|index|
+---+-----+
|  0|    1|
|  1|    2|
|  2|    3|
|  3|    4|
|  4|    5|
|  5|    6|
|  6|    7|
|  7|    8|
|  8|    9|
|  9|   10|
+---+-----+

但是要小心,因为它是一个窗口函数,需要一个有序的窗口,并将属于一个窗口分区的所有行移动到一个Spark分区,如警告所示:

2016年10月17日08:27:01警告WindowExec:没有为Window操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能下降。

这意味着对于一个非常大的数据集,您可能会产生很长的GC,甚至由于OutOfMemoryError而根本无法完成。

单调递增id()函数

还有另一个函数单调地_incresing_id:

单调递增_id():列生成单调递增64位整数的列表达式。

请注意。。。

生成的ID保证单调递增且唯一,但不是连续的。当前的实现将分区ID放在高31位,将每个分区内的记录号放在低33位。假设数据帧有不到10亿个分区,每个分区有不到80亿条记录。

如果要执行mySQL查询,则需要使用标准JDBC API。

Spark SQL API与DataFrame ou DataSet(Spark Object)相关。

主题How do I pass parameters to my SQL statements?不讨论mySql而是讨论my SQL

相关内容

  • 没有找到相关文章

最新更新