Pyspark,数据框架的顶部



我想做的是一个数据框,请根据某些指定的列获得顶级元素。RDD API中的顶部(self,num)正是我想要的。我想知道DataFrame World中是否有同等的API?

我的第一次尝试是以下

def retrieve_top_n(df, n):
    # assume we want to get most popular n 'key' in DataFrame
    return df.groupBy('key').count().orderBy('count', ascending=False).limit(n).select('key')

但是,我已经意识到这导致了非确定性行为(我不知道确切原因,但我猜限制(n)不能保证要采用哪个n)

首先,让我们定义一个函数以生成测试数据:

import numpy as np
def sample_df(num_records):
    def data():
      np.random.seed(42)
      while True:
          yield int(np.random.normal(100., 80.))
    data_iter = iter(data())
    df = sc.parallelize((
        (i, next(data_iter)) for i in range(int(num_records))
    )).toDF(('index', 'key_col'))
    return df
sample_df(1e3).show(n=5)
+-----+-------+
|index|key_col|
+-----+-------+
|    0|    139|
|    1|     88|
|    2|    151|
|    3|    221|
|    4|     81|
+-----+-------+
only showing top 5 rows


现在,让我们提出三种计算Topk的方法:

from pyspark.sql import Window
from pyspark.sql import functions

def top_df_0(df, key_col, K):
    """
    Using window functions.  Handles ties OK.
    """
    window = Window.orderBy(functions.col(key_col).desc())
    return (df
            .withColumn("rank", functions.rank().over(window))
            .filter(functions.col('rank') <= K)
            .drop('rank'))

def top_df_1(df, key_col, K):
    """
    Using limit(K). Does NOT handle ties appropriately.
    """
    return df.orderBy(functions.col(key_col).desc()).limit(K)

def top_df_2(df, key_col, K):
    """
    Using limit(k) and then filtering.  Handles ties OK."
    """
    num_records = df.count()
    value_at_k_rank = (df
                       .orderBy(functions.col(key_col).desc())
                       .limit(k)
                       .select(functions.min(key_col).alias('min'))
                       .first()['min'])
    return df.filter(df[key_col] >= value_at_k_rank)

称为top_df_1的功能类似于您最初实现的功能。它为您提供非确定性行为的原因是因为它不能很好地处理联系。如果您有很多数据,并且仅出于性能而对大概答案感兴趣,这可能是一件可以的事情。


最后,让我们进行基准测试

用于基准测试,使用带有400万个条目的火花DF并定义便利功能:

NUM_RECORDS = 4e6
test_df = sample_df(NUM_RECORDS).cache()
def show(func, df, key_col, K):
    func(df, key_col, K).select(
      functions.max(key_col),
      functions.min(key_col),
      functions.count(key_col)
    ).show()


让我们看看判决:

%timeit show(top_df_0, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
|         502|         420|           108|
+------------+------------+--------------+
1 loops, best of 3: 1.62 s per loop

%timeit show(top_df_1, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
|         502|         420|           100|
+------------+------------+--------------+
1 loops, best of 3: 252 ms per loop

%timeit show(top_df_2, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
|         502|         420|           108|
+------------+------------+--------------+
1 loops, best of 3: 725 ms per loop

(请注意,top_df_0top_df_2在前100名中有108个条目。这是由于存在第100个最佳的绑定条目。top_df_1实现忽略了绑定的条目。)。


底线

如果您想使用top_df_2的确切答案(比top_df_0好约2倍)。如果您想要另一个X2的性能,并且可以使用top_df_1的大致答案。

选项:

1)在窗口函数中使用pyspark sql row_number-相关的so:Spark DataFrame分组,排序和为一组列选择顶行

2)将订购的DF转换为rdd并在那里使用顶部功能(提示:这似乎实际上并没有从我的快速测试中维护订单,但是ymmv)

您应该尝试使用head()而不是limit()

#sample data
df = sc.parallelize([
    ['123', 'b'], ['666', 'a'],
    ['345', 'd'], ['555', 'a'],
    ['456', 'b'], ['444', 'a'],
    ['678', 'd'], ['333', 'a'],
    ['135', 'd'], ['234', 'd'],
    ['987', 'c'], ['987', 'e']
]).toDF(('col1', 'key_col'))
#select top 'n' 'key_col' values from dataframe 'df'
def retrieve_top_n(df, key, n):
    return sqlContext.createDataFrame(df.groupBy(key).count().orderBy('count', ascending=False).head(n)).select(key)
retrieve_top_n(df, 'key_col', 3).show()

希望这会有所帮助!

相关内容

  • 没有找到相关文章

最新更新