我的数据集看起来像这样。每行代表一辆汽车。每辆车都位于一个Auto Center
,有一个Model
、Make
和一堆其他的属性。这是数据框的简化版本。为清楚起见,省略了无关的行和列。
+===========+========+=======+====+=====+
|Auto Center|Model |Make |Year|Color|
+===========+========+=======+====+=====+
|Roseville |Corolla |Toyota | | |
|Roseville |Prius |Toyota | | |
|Rocklin |Camry |Toyota | | |
|Rocklin |Forester|Subaru | | |
+===========+========+=======+====+=====+
我想做什么?我想按Auto Center
对数据进行分组,并通过量化显示每个Auto Center
中排名前 5 的汽车的"列表",并打印它们的属性Make
、Model
、Year
和Color
。
按Auto Center
对数据进行分组后,我想计算每个Model
的出现次数,甚至更好的是Make
和Model
的组合,在每个Auto Center
中,我想得到一个出现次数最多的前 5 辆车的列表。然后我想打印那辆车的多列。
假设具有相同Make
和Model
的每辆车的Year
和Color
相同。
例如,输出应该是这样的,每个汽车中心前 5 辆汽车的列表按出现次数排序。
Rosevile:
there are 12 red Toyota Prius 2009
there are 8 blue Toyota Cary 2010
...
这是我到目前为止所拥有的:
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
sc = SparkContext()
sqlContext = SQLContext(sc)
scSpark = SparkSession
.builder
.appName("Auto Center Big Data")
.config("spark.some.config.option", "some-value")
.getOrCreate()
data = scSpark.read.csv("autocenters.csv", header=True, inferSchema=True)
data.printSchema();
data.groupby('Auto Center')
似乎data.groupby()
返回了一个GroupedData
对象。我似乎.agg()
函数可以应用于它,但这仅适用于数字数据,例如查找某些数字的平均值,这里我有字符串。我想按每组中出现的次数来计算字符串。
我该怎么办?有没有办法同时将聚合函数应用于多个列,例如同时Make
和Model
?如果没有,那应该没问题,考虑到没有具有相同Model
具有不同Make
的汽车。
IIUC,您可以通过以下两个步骤来完成:
-
首先按要计算发生次数的所有列进行分组:
df1 = df.groupby('Auto Center', 'Model', 'Make', 'Year', 'Color').count()
-
然后设置一个窗口规范,并通过 row_number(( 获得前5名: (注意:根据您希望如何处理领带,您可能希望将函数row_number(( 更改为 rank(( 或dense_rank(((
from pyspark.sql import Window from pyspark.sql.functions import row_number, desc w1 = Window.partitionBy('Auto Center').orderBy(desc('count')) df_new = df1.withColumn('rn', row_number().over(w1)).where('rn <= 5').drop('rn')
import pyspark
from pyspark.sql import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
然后创建一个类似于您的随机数据帧
import string
import random
n = 1000
center = ['center_{}'.format(random.choice(string.ascii_letters[:2])) for y in range(n)]
make = ['make_{}'.format(random.choice(string.ascii_letters[:3])) for y in range(n)]
model = ['model_{}'.format(random.choice(string.ascii_letters[:3])) for y in range(n)]
year = [random.choice(range(2018, 2019)) for y in range(n)]
color = [random.choice(['red', 'blue', 'black']) for y in range(n)]
df = spark.createDataFrame(zip(center, make, model, year, color), schema=['center', 'make', 'model', 'year', 'color'])
df.head()
Row(center='center_b', make='make_a', model='model_c', year=2018, color='black')
按中心、品牌、型号、年份、颜色分组
df_groupby = (df
.groupby('center', 'make', 'model', 'year', 'color')
.count()
)
df_groupby.sort(df_groupby['center'],
df_groupby['count'].desc()).show()
+--------+------+-------+----+-----+-----+
| center| make| model|year|color|count|
+--------+------+-------+----+-----+-----+
|center_a|make_c|model_b|2018| blue| 33|
|center_a|make_a|model_c|2018| blue| 24|
|center_a|make_a|model_a|2018| red| 23|
|center_a|make_b|model_c|2018| blue| 21|
|center_a|make_b|model_b|2018|black| 21|
|center_a|make_c|model_a|2018|black| 21|
|center_a|make_c|model_c|2018| blue| 21|
|center_a|make_a|model_c|2018|black| 20|
|center_a|make_a|model_b|2018| red| 20|
|center_a|make_a|model_b|2018| blue| 19|
|center_a|make_c|model_c|2018|black| 18|
|center_a|make_a|model_c|2018| red| 18|
|center_a|make_c|model_b|2018| red| 18|
|center_a|make_b|model_b|2018| red| 18|
|center_a|make_c|model_a|2018| red| 18|
|center_a|make_a|model_b|2018|black| 18|
|center_a|make_b|model_c|2018|black| 18|
|center_a|make_a|model_a|2018| blue| 17|
|center_a|make_c|model_a|2018| blue| 17|
|center_a|make_c|model_b|2018|black| 15|
+--------+------+-------+----+-----+-----+
only showing top 20 rows
使用窗口,每个中心仅保留前 5 个品牌/型号/颜色/年份
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc
w = Window.partitionBy('center').orderBy(desc('count'))
df_groupby2 = df_groupby.withColumn('rn', row_number().over(w)).where('rn <= 5').drop('rn')
df_groupby2.sort(df_groupby2['center'],
df_groupby2['count'].desc()
).show()
+--------+------+-------+----+-----+-----+
| center| make| model|year|color|count|
+--------+------+-------+----+-----+-----+
|center_a|make_c|model_b|2018| blue| 33|
|center_a|make_a|model_c|2018| blue| 24|
|center_a|make_a|model_a|2018| red| 23|
|center_a|make_b|model_b|2018|black| 21|
|center_a|make_c|model_a|2018|black| 21|
|center_b|make_a|model_a|2018| red| 31|
|center_b|make_c|model_c|2018|black| 24|
|center_b|make_b|model_a|2018| blue| 24|
|center_b|make_b|model_b|2018|black| 23|
|center_b|make_c|model_c|2018| blue| 23|
+--------+------+-------+----+-----+-----+
现在创建并打印您的文本
df_final = (df_groupby2
.withColumn('text', F.concat(F.lit("there are "),
df_groupby2['count'],
F.lit(" "),
df_groupby2['color'],
F.lit(" "),
df_groupby2['make'],
F.lit(" "),
df_groupby2['model'],
F.lit(" "),
df_groupby2['year'])
)
.sort(df_groupby2['center'],
df_groupby2['count'].desc()
)
)
for row in df_final.select('center').distinct().sort('center').collect():
current_center = row['center']
print(current_center, ":")
for row in df_final.filter(df_final['center'] == current_center).collect():
print(row['text'])
center_a :
there are 33 blue make_c model_b 2018
there are 24 blue make_a model_c 2018
there are 23 red make_a model_a 2018
there are 21 black make_b model_b 2018
there are 21 black make_c model_a 2018
center_b :
there are 31 red make_a model_a 2018
there are 24 blue make_b model_a 2018
there are 24 black make_c model_c 2018
there are 23 black make_b model_b 2018
there are 23 blue make_c model_c 2018