从 Spark 数据帧获取特定行



在 scala spark 数据帧中是否有任何替代方案可以替代df[100, c("column")]。我想从一列火花数据框中选择特定行。例如100th上面 R 等效代码中的行

首先,

您必须了解DataFrames是分布式的,这意味着您无法以典型的过程方式访问它们,您必须首先进行分析。虽然,你问的是Scala我建议你阅读 Pyspark 文档,因为它比任何其他文档都有更多的示例。

但是,继续我的解释,我将使用 RDD API 的一些方法,因为所有DataFrame都有一个RDD作为属性。请看下面我的例子,注意我是如何取得第二条记录的。

df = sqlContext.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"])
myIndex = 1
values = (df.rdd.zipWithIndex()
            .filter(lambda ((l, v), i): i == myIndex)
            .map(lambda ((l,v), i): (l, v))
            .collect())
print(values[0])
# (u'b', 2)

希望有人用更少的步骤给出另一种解决方案。

这就是我在 Scala 中实现相同目标的方式。我不确定它是否比有效答案更有效,但它需要更少的编码

val parquetFileDF = sqlContext.read.parquet("myParquetFule.parquet")
val myRow7th = parquetFileDF.rdd.take(7).last

在 PySpark 中,如果你的数据集很小(可以放入驱动程序的内存中),你可以

df.collect()[n]

其中df是数据帧对象,n是感兴趣的行。获得上述行后,您可以执行row.myColumnrow["myColumn"]来获取内容,如 API 文档中所述。

下面的getrows()函数应该得到你想要的特定行。

为了完整起见,我写下了完整的代码以重现输出。

# Create SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('scratch').getOrCreate()
# Create the dataframe
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"])
# Function to get rows at `rownums`
def getrows(df, rownums=None):
    return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])
# Get rows at positions 0 and 2.
getrows(df, rownums=[0, 2]).collect()
# Output:
#> [(Row(letter='a', name=1)), (Row(letter='c', name=3))]

这在 PySpark 中对我有用

df.select("column").collect()[0][0]

有一种 scala 方法(如果你在工作机器上有足够的内存):

val arr = df.select("column").rdd.collect
println(arr(100))

如果数据帧架构未知,并且您知道"column"字段的实际类型(例如双精度),则可以获得如下arr

val arr = df.select($"column".cast("Double")).as[Double].rdd.collect
你可以

简单地使用以下一行代码来做到这一点

val arr = df.select("column").collect()(99)

当您想从数据帧中获取日期列的最大值时,只需获取没有对象类型或行对象信息的值,可以参考以下代码。

= "我的表"

max_date = df.select(max('date_col')).first()[0]

2020-06-26<</strong>/>而不是 Row(max(reference_week)=datetime.date(2020, 6, 26))

以下是Java-Spark的方法,1)添加按顺序递增的列。 2) 使用 ID 选择行号。 3) 删除列

import static org.apache.spark.sql.functions.*;
..
ds = ds.withColumn("rownum", functions.monotonically_increasing_id());
ds = ds.filter(col("rownum").equalTo(99));
ds = ds.drop("rownum");

注意:monotonically_increasing_id从0开始;

相关内容

  • 没有找到相关文章

最新更新