如何选择最后一行,以及如何通过索引访问PySpark数据帧



从类似的PySpark SQL数据帧

name age city
abc   20  A
def   30  B

如何到达最后一排。(就像df.limit(1(一样,我可以将数据帧的第一行放入新的数据帧中(。

以及我如何通过索引访问数据帧行,如第12行或第200行。

在熊猫身上我可以做

df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]

我只是好奇如何以这种方式或替代方式访问pyspark数据帧。

感谢

如何到达最后一排。

如果您有一个可以用于排序数据帧的列,例如"索引",那么获取最后一条记录的一种简单方法是使用SQL:1( 按降序排列表格2( 从该订单中取第一个值

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()

以及我如何通过索引访问数据帧行,如第12行或第200行。

类似的方式,你可以在任何一行中获得记录

row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()

如果您没有"索引"列,您可以使用创建它

from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("index", monotonically_increasing_id())

如何到达最后一排。

假设所有列都是现代的漫长而丑陋的方式:

from pyspark.sql.functions import (
    col, max as max_, struct, monotonically_increasing_id
)
last_row = (df
    .withColumn("_id", monotonically_increasing_id())
    .select(max(struct("_id", *df.columns))
    .alias("tmp")).select(col("tmp.*"))
    .drop("_id"))

如果不是所有列都可以订购,您可以尝试:

with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]
with_id.where(col("_id") == i).drop("_id")

注意。pyspark.sql.functions/`o.a.s.sql.functions中有last函数,但考虑到对相应表达式的描述,这里不是一个好的选择。

如何通过类似的索引访问数据帧行

你不能。Spark DataFrame,可通过索引访问。您可以使用zipWithIndex添加索引,稍后进行筛选。请记住这个O(N(操作。

from pyspark.sql import functions as F
expr = [F.last(col).alias(col) for col in df.columns]
df.agg(*expr)

只是一个提示:看起来你仍然有处理熊猫或R的人的心态。Spark是我们处理数据的另一种模式。你不再访问单个单元格中的数据,现在你处理的是整块数据。如果你像刚才那样不断收集数据并采取行动,你就会失去spark提供的并行性的整个概念。看看Spark中转换与操作的概念。

使用以下内容获取包含单调递增、唯一、连续整数的索引列,这不是monotonically_increasing_id()的工作方式。索引将按照与DataFrame的colName相同的顺序递增。

import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df = df
 .withColumn('int', F.lit(1))
 .withColumn('index', F.sum('int').over(window))
 .drop('int')

使用以下代码查看DataFrame的尾部或最后一个rownums

rownums = 10
df.where(F.col('index')>df.count()-rownums).show()

使用以下代码查看DataFrame中从start_rowend_row的行。

start_row = 20
end_row = start_row + 10
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show()

zipWithIndex()是一种RDD方法,它确实返回单调递增、唯一和连续的整数,但在某种程度上实现起来要慢得多,因为您可以返回到用id列修改的原始DataFrame。

相关内容

  • 没有找到相关文章

最新更新