Pyspark 如何计算每个组中字符串的出现次数并打印多个选定的列?



我的数据集看起来像这样。每行代表一辆汽车。每辆车都位于一个Auto Center,有一个ModelMake和一堆其他的属性。这是数据框的简化版本。为清楚起见,省略了无关的行和列。

+===========+========+=======+====+=====+
|Auto Center|Model   |Make   |Year|Color|
+===========+========+=======+====+=====+
|Roseville  |Corolla |Toyota |    |     |
|Roseville  |Prius   |Toyota |    |     |
|Rocklin    |Camry   |Toyota |    |     |
|Rocklin    |Forester|Subaru |    |     |
+===========+========+=======+====+=====+

我想做什么?我想按Auto Center对数据进行分组,并通过量化显示每个Auto Center中排名前 5 的汽车的"列表",并打印它们的属性MakeModelYearColor

Auto Center对数据进行分组后,我想计算每个Model的出现次数,甚至更好的是MakeModel的组合,在每个Auto Center中,我想得到一个出现次数最多的前 5 辆车的列表。然后我想打印那辆车的多列。

假设具有相同MakeModel的每辆车的YearColor相同。

例如,输出应该是这样的,每个汽车中心前 5 辆汽车的列表按出现次数排序。

Rosevile:
there are 12 red Toyota Prius 2009
there are 8 blue Toyota Cary 2010
...

这是我到目前为止所拥有的:

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
sc = SparkContext()
sqlContext = SQLContext(sc)
scSpark = SparkSession 
.builder 
.appName("Auto Center Big Data") 
.config("spark.some.config.option", "some-value") 
.getOrCreate()
data = scSpark.read.csv("autocenters.csv", header=True, inferSchema=True)
data.printSchema();
data.groupby('Auto Center')

似乎data.groupby()返回了一个GroupedData对象。我似乎.agg()函数可以应用于它,但这仅适用于数字数据,例如查找某些数字的平均值,这里我有字符串。我想按每组中出现的次数来计算字符串。

我该怎么办?有没有办法同时将聚合函数应用于多个列,例如同时MakeModel?如果没有,那应该没问题,考虑到没有具有相同Model具有不同Make的汽车。

IIUC,您可以通过以下两个步骤来完成:

  1. 首先按要计算发生次数的所有列进行分组:

    df1 = df.groupby('Auto Center', 'Model', 'Make', 'Year', 'Color').count()
    
  2. 然后设置一个窗口规范,并通过 row_number(( 获得前5名: (注意:根据您希望如何处理领带,您可能希望将函数row_number(( 更改为 rank(( 或dense_rank(((

    from pyspark.sql import Window
    from pyspark.sql.functions import row_number, desc
    w1 = Window.partitionBy('Auto Center').orderBy(desc('count'))
    df_new = df1.withColumn('rn', row_number().over(w1)).where('rn <= 5').drop('rn')
    

import pyspark

from pyspark.sql import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

然后创建一个类似于您的随机数据帧

import string
import random
n = 1000
center = ['center_{}'.format(random.choice(string.ascii_letters[:2])) for y in range(n)]
make = ['make_{}'.format(random.choice(string.ascii_letters[:3])) for y in range(n)]
model = ['model_{}'.format(random.choice(string.ascii_letters[:3])) for y in range(n)]
year = [random.choice(range(2018, 2019)) for y in range(n)]
color = [random.choice(['red', 'blue', 'black']) for y in range(n)]
df = spark.createDataFrame(zip(center, make, model, year, color), schema=['center', 'make', 'model', 'year', 'color'])
df.head()
Row(center='center_b', make='make_a', model='model_c', year=2018, color='black')

按中心、品牌、型号、年份、颜色分组

df_groupby = (df
.groupby('center', 'make', 'model', 'year', 'color')
.count()
)
df_groupby.sort(df_groupby['center'],
df_groupby['count'].desc()).show()
+--------+------+-------+----+-----+-----+
|  center|  make|  model|year|color|count|
+--------+------+-------+----+-----+-----+
|center_a|make_c|model_b|2018| blue|   33|
|center_a|make_a|model_c|2018| blue|   24|
|center_a|make_a|model_a|2018|  red|   23|
|center_a|make_b|model_c|2018| blue|   21|
|center_a|make_b|model_b|2018|black|   21|
|center_a|make_c|model_a|2018|black|   21|
|center_a|make_c|model_c|2018| blue|   21|
|center_a|make_a|model_c|2018|black|   20|
|center_a|make_a|model_b|2018|  red|   20|
|center_a|make_a|model_b|2018| blue|   19|
|center_a|make_c|model_c|2018|black|   18|
|center_a|make_a|model_c|2018|  red|   18|
|center_a|make_c|model_b|2018|  red|   18|
|center_a|make_b|model_b|2018|  red|   18|
|center_a|make_c|model_a|2018|  red|   18|
|center_a|make_a|model_b|2018|black|   18|
|center_a|make_b|model_c|2018|black|   18|
|center_a|make_a|model_a|2018| blue|   17|
|center_a|make_c|model_a|2018| blue|   17|
|center_a|make_c|model_b|2018|black|   15|
+--------+------+-------+----+-----+-----+
only showing top 20 rows

使用窗口,每个中心仅保留前 5 个品牌/型号/颜色/年份

from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc
w = Window.partitionBy('center').orderBy(desc('count'))
df_groupby2 = df_groupby.withColumn('rn', row_number().over(w)).where('rn <= 5').drop('rn')
df_groupby2.sort(df_groupby2['center'],
df_groupby2['count'].desc()
).show()
+--------+------+-------+----+-----+-----+
|  center|  make|  model|year|color|count|
+--------+------+-------+----+-----+-----+
|center_a|make_c|model_b|2018| blue|   33|
|center_a|make_a|model_c|2018| blue|   24|
|center_a|make_a|model_a|2018|  red|   23|
|center_a|make_b|model_b|2018|black|   21|
|center_a|make_c|model_a|2018|black|   21|
|center_b|make_a|model_a|2018|  red|   31|
|center_b|make_c|model_c|2018|black|   24|
|center_b|make_b|model_a|2018| blue|   24|
|center_b|make_b|model_b|2018|black|   23|
|center_b|make_c|model_c|2018| blue|   23|
+--------+------+-------+----+-----+-----+

现在创建并打印您的文本

df_final = (df_groupby2
.withColumn('text', F.concat(F.lit("there are "),
df_groupby2['count'],
F.lit(" "),
df_groupby2['color'],
F.lit(" "),
df_groupby2['make'],
F.lit(" "),
df_groupby2['model'],
F.lit(" "),
df_groupby2['year'])
)
.sort(df_groupby2['center'],
df_groupby2['count'].desc()
)
)
for row in df_final.select('center').distinct().sort('center').collect():
current_center = row['center']
print(current_center, ":")
for row in df_final.filter(df_final['center'] == current_center).collect():
print(row['text'])
center_a :
there are 33 blue make_c model_b 2018
there are 24 blue make_a model_c 2018
there are 23 red make_a model_a 2018
there are 21 black make_b model_b 2018
there are 21 black make_c model_a 2018
center_b :
there are 31 red make_a model_a 2018
there are 24 blue make_b model_a 2018
there are 24 black make_c model_c 2018
there are 23 black make_b model_b 2018
there are 23 blue make_c model_c 2018

最新更新