我必须使用PySpark来查找最常销售的产品从名称为sales_NY
,样本为:
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+-----+----+
|OrderID| Product|Quantity| Price| OrderDate| StoreAddress| City|State|Month|Hour|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+-----+----+
| 295665| Macbook Pro Laptop| 1|1700.0|2019-12-30 00:01:00|136 Church St, Ne...|New York City| NY| 12| 0|
| 295666| LG Washing Machine| 1| 600.0|2019-12-29 07:03:00|562 2nd St, New Y...|New York City| NY| 12| 7|
| 295667|USB-C Charging Cable| 1| 11.95|2019-12-12 18:21:00|277 Main St, New ...|New York City| NY| 12| 18|
| 295670|AA Batteries (4-p...| 1| 3.84|2019-12-31 22:58:00|200 Jefferson St,...|New York City| NY| 12| 22|
| 295698| Vareebadd Phone| 1| 400.0|2019-12-13 14:32:00|175 1st St, New Y...|New York City| NY| 12| 14|
| 295698|USB-C Charging Cable| 2| 11.95|2019-12-13 14:32:00|175 1st St, New Y...|New York City| NY| 12| 14|
| 295700|Bose SoundSport H...| 1| 99.99|2019-12-25 19:02:00|363 Hickory St, N...|New York City| NY| 12| 19|
| 295704| Wired Headphones| 1| 11.99|2019-12-12 00:20:00|457 8th St, New Y...|New York City| NY| 12| 0|
| 295705| Wired Headphones| 1| 11.99|2019-12-25 10:41:00|133 Jackson St, N...|New York City| NY| 12| 10|
| 295712| Macbook Pro Laptop| 1|1700.0|2019-12-10 20:02:00|331 Madison St, N...|New York City| NY| 12| 20|
| 295713|Bose SoundSport H...| 1| 99.99|2019-12-24 07:55:00|490 Spruce St, Ne...|New York City| NY| 12| 7|
| 295720|AA Batteries (4-p...| 1| 3.84|2019-12-17 22:52:00|298 Ridge St, New...|New York City| NY| 12| 22|
| 295728| 27in FHD Monitor| 1|149.99|2019-12-21 19:21:00|366 Washington St...|New York City| NY| 12| 19|
| 295735| iPhone| 1| 700.0|2019-12-22 18:25:00|374 Lincoln St, N...|New York City| NY| 12| 18|
| 295735|Apple Airpods Hea...| 1| 150.0|2019-12-22 18:25:00|374 Lincoln St, N...|New York City| NY| 12| 18|
| 295735| Wired Headphones| 1| 11.99|2019-12-22 18:25:00|374 Lincoln St, N...|New York City| NY| 12| 18|
| 295740|USB-C Charging Cable| 1| 11.95|2019-12-01 20:36:00|102 Cedar St, New...|New York City| NY| 12| 20|
| 295742|Apple Airpods Hea...| 1| 150.0|2019-12-09 23:45:00|368 Sunset St, Ne...|New York City| NY| 12| 23|
| 295743|USB-C Charging Cable| 1| 11.95|2019-12-03 11:52:00|346 South St, New...|New York City| NY| 12| 11|
| 295745| Flatscreen TV| 1| 300.0|2019-12-24 10:38:00|124 Lakeview St, ...|New York City| NY| 12| 10|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+-----+----+
查找最常销售的产品我使用代码的第一部分,我将数据加载到内存中(对不起,缺乏可重复性,但整个数据的大小相当大,包括在这里):
共同部分:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, size, hour
spark = (SparkSession.builder.appName('SalesAnalytics').getOrCreate())
### This is the local path to my data:
file_path = './data/output/sales/ReportYear=2019'
sales_raw_df = (spark.read.format('parquet')
.option('header', 'True')
.option('inferSchema', 'True')
.load(file_path))
sales_raw_df = sales_raw_df.withColumn('Hour', hour(sales_raw_df.OrderDate))
sales_NY = (sales_raw_df.where(col('State') == 'NY'))
现在我可以遵循2个不同版本的解决方案,我认为这是完全等效的,但输出略有不同。两个版本的不同之处在于,第二个版本使用orderBy('OrderID', 'State')
增加了一个中间步骤。
版本1:
sales_q4_df = (sales_NY.groupBy('OrderID', 'State')
.agg(collect_list('Product').alias('ProductList')))
sales_q4_df = (sales_q4_df.withColumn('ProductListSize', size('ProductList')))
### discards the orders with a single product (the OrderID just appears once)
sales_q4_df = sales_q4_df.filter(col('ProductListSize') > 1).orderBy('ProductList', ascending=True)
most_prods_together = sales_q4_df.groupBy('ProductList').count().orderBy('count', ascending=False).show(10, False)
输出1:
+------------------------------------------------------+-----+
|ProductList |count|
+------------------------------------------------------+-----+
|[iPhone, Lightning Charging Cable] |126 |
|[Google Phone, USB-C Charging Cable] |124 |
|[Google Phone, Wired Headphones] |52 |
|[Vareebadd Phone, USB-C Charging Cable] |49 |
|[iPhone, Wired Headphones] |46 |
|[iPhone, Apple Airpods Headphones] |43 |
|[Google Phone, Bose SoundSport Headphones] |23 |
|[Vareebadd Phone, Wired Headphones] |17 |
|[Apple Airpods Headphones, Wired Headphones] |12 |
|[Google Phone, USB-C Charging Cable, Wired Headphones]|11 |
+------------------------------------------------------+-----+
版本2:
sales_q4_df = (sales_NY.orderBy('OrderID', 'Product')
.groupBy('OrderID', 'State')
.agg(collect_list('Product').alias('ProductList')))
sales_q4_df = (sales_q4_df.withColumn('ProductListSize', size('ProductList')))
### discards the orders with a single product (the OrderID just appears once)
sales_q4_df = sales_q4_df.filter(col('ProductListSize') > 1).orderBy('ProductList', ascending=True)
most_prods_together = sales_q4_df.groupBy('ProductList').count().orderBy('count', ascending=False).show(10, False)
输出2:
+-------------------------------------------------+-----+
|ProductList |count|
+-------------------------------------------------+-----+
|[Google Phone, USB-C Charging Cable] |127 |
|[Lightning Charging Cable, iPhone] |126 |
|[Google Phone, Wired Headphones] |53 |
|[USB-C Charging Cable, Vareebadd Phone] |50 |
|[Wired Headphones, iPhone] |46 |
|[Apple Airpods Headphones, iPhone] |45 |
|[Bose SoundSport Headphones, Google Phone] |24 |
|[Apple Airpods Headphones, Wired Headphones] |19 |
|[Vareebadd Phone, Wired Headphones] |17 |
|[AA Batteries (4-pack), Lightning Charging Cable]|16 |
+-------------------------------------------------+-----+
谁能解释一下为什么结果不同?这是PySpark中的一个bug吗?
我正在使用jupyterlab v.3.4.2, PySpark v.3.0.1和Java v.15的笔记本电脑。
PD:我必须补充说,我也尝试了sort()
方法(比orderBy()
更有效,因为使用了几个分区),但结果是一样的。
正如Emma提到的,对随机顺序的列表列使用group_by可能会返回奇怪的结果。[a, b]和[' b ', ' ')是两种不同的价值观。
您可以在collect_list上运行array_sort
,然后group_by每次都应该返回相同的结果。
也就是说,列表中的group_by并不是回答最常一起销售的产品是什么这个问题的最准确的方法。['iphone', 'iphone充电器','口香糖']和['iphone', 'iphone充电器']是两个不同的行在你的结果数据框,但iphone和iphone充电器之间的关联可能是你正在寻找的。
你可以使用pyspark.ml.fpm FP-Grow,它可以返回通常一起返回的关联规则和成对的项(或其他大小)。
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import size, col
df = spark.createDataFrame(
[
[1, ["tomato", "cucumber", "onion"]],
[2, ["cucumber", "avocado", "tomato", "olive oil"]],
[3, ["cucumber", "tomato", "lettuce", "onion"]],
[4, ["lettuce", "onion"]],
[5, ["olive oil", "bread"]],
[6, ["onion", "olive oil", "lettuce"]]
], ["order_id", "products"]
)
fpGrowth = FPGrowth(itemsCol="products", minSupport=0.2, minConfidence=0.75)
model = fpGrowth.fit(df)
model.freqItemsets.filter(size("items") > 1).orderBy(col("freq").desc()).show()
+--------------------+----+
| items|freq|
+--------------------+----+
| [lettuce, onion]| 3|
| [tomato, cucumber]| 3|
| [cucumber, onion]| 2|
|[tomato, cucumber...| 2|
| [tomato, onion]| 2|
+--------------------+----+
还有model.associationRules.show()你可能会感兴趣
这是因为Spark的数据框架是无序的。
在版本1中,您没有orderBy
,因此collect_list
可以以任何顺序收集产品。
sales_q4_df = (sales_NY.groupBy('OrderID', 'State')
.agg(collect_list('Product').alias('ProductList')))
在版本1[Google Phone, Bose SoundSport Headphones]
的列表中检查这一项,"google"出现在"google"之前,计数为23。
我猜你有一个条目[Bose SoundSport Headphones, Google Phone]
计数1,如果你删除>1过滤。
在版本2中,您添加了.orderBy('OrderID', 'Product')
,现在使查询具有确定性,并且您正在用24计算[Bose SoundSport Headphones, Google Phone]
的正确集合。