如何实现sparksql分页查询



有人知道如何在sparksql查询中进行分页吗?

我需要使用spark-sql,但不知道如何进行分页。

尝试:

select * from person limit 10, 10

已经6年了,不知道当时是否可能。

我会在答案上添加一个顺序id,并搜索偏移量和偏移量+限制之间的寄存器

在纯Spark SQL查询中,偏移量为10,限制为10的是这样的

WITH count_person AS (
    SELECT *, monotonically_increasing_id() AS count FROM person)
SELECT * FROM count_person WHERE count > 10 AND count < 20

在PySpark上,它与非常相似

import pyspark.sql.functions as F
offset = 10
limit = 10
df = df.withColumn('_id', F.monotonically_increasing_id())
df = df.where(F.col('_id').between(offset, offset + limit))

即使对于大数据量,它也足够灵活和快速。

如果数据帧中有重复的行,那么karthik的回答将失败except’将删除df1中在df2中的所有行。
val filteredRdd = df.rdd.zipWithIndex().collect { case (r, i) if 10 >= start && i <=20 => r }
val newDf = sqlContext.createDataFrame(filteredRdd, df.schema)

到目前为止,spark sql中还不支持偏移量。您可以使用except方法通过DataFrames进行分页。

示例:如果要以10的分页限制进行迭代,可以执行以下操作:

    DataFrame df1;
    long count = df.count();
    int limit = 10;
    while(count > 0){
        df1 = df.limit(limit);
        df1.show();            //will print 10, next 10, etc rows
        df = df.except(df1);
        count = count - limit;
    }

如果你想在第一次尝试时说LIMIT 50, 100,你可以做以下操作:

        df1 = df.limit(50);
        df2 = df.except(df1);
        df2.limit(100);       //required result

希望这能有所帮助!

请在下面找到一个名为SparkPaging的有用PySpark(Python 3和Spark 3)类,它抽象了分页机制:https://gitlab.com/enahwe/public/lib/spark/sparkpaging

用法如下:

SparkPaging

用于分页数据帧和数据集的类

示例

-初始化示例1:

通过指定限制来接近。

sp = SparkPaging(initData=df, limit=753)

-初始化示例2:

方法是指定页数(如果有剩余,页数将递增)。

sp = SparkPaging(initData=df, pages=6)

-初始化示例3:

通过指定限制来接近。

sp = SparkPaging()
sp.init(initData=df, limit=753)

-初始化示例4:

方法是指定页数(如果有剩余,页数将递增)。

sp = SparkPaging()
sp.init(initData=df, pages=6)

-重置:

sp.reset()

-迭代示例:

print("- Total number of rows = " + str(sp.initDataCount))
print("- Limit = " + str(sp.limit))
print("- Number of pages = " + str(sp.pages))
print("- Number of rows in the last page = " + str(sp.numberOfRowsInLastPage))
while (sp.page < sp.pages-1):
    df_page = sp.next()
    nbrRows = df_page.count()
    print("  Page " + str(sp.page) + '/' + str(sp.pages) + ": Number of rows = " + str(nbrRows))

-输出:

- Total number of rows = 4521
- Limit = 753
- Number of pages = 7
- Number of rows in the last page = 3
    Page 0/7: Number of rows = 753
    Page 1/7: Number of rows = 753
    Page 2/7: Number of rows = 753
    Page 3/7: Number of rows = 753
    Page 4/7: Number of rows = 753
    Page 5/7: Number of rows = 753
    Page 6/7: Number of rows = 3

相关内容

  • 没有找到相关文章

最新更新