我有一个数据集,该数据集的公司每日生产速率。我想在该数据框架中添加一列,该列将根据该公司按日期依次根据该公司进行数字。
ex。
Acme Product1 1/1/2000 5
Acme Product1 1/2/2000 7
Acme Product2 3/1/2000 9
Acme Product2 3/2/2000 4
Company2 ProductX 4/1/2015 6
Company2 ProductX 4/2/2015 3
我想添加一个新列,例如:
Acme Product1 1/1/2000 5 1
Acme Product1 1/2/2000 7 2
Acme Product2 3/1/2000 9 1
Acme Product2 3/2/2000 4 2
Company2 ProductX 4/1/2015 6 1
Company2 ProductX 4/2/2015 3 2
Company2 ProductX 4/2/2015 2 3
这就是这样,我可以根据新专栏比较公司及其产品。因此,无论日期如何
您可以使用pyspark.sql函数row_number
使用窗口(别名rowNumber
适用于spark <= 1.6.X
)。
首先,让我们创建数据框:
myDF = spark.createDataFrame(
sc.parallelize([["Acme", "Product1", "1/1/2000", 5],
["Acme", "Product1", "1/2/2000", 7],
["Acme", "Product2", "3/1/2000", 9],
["Acme", "Product2", "3/2/2000", 4],
["Company2", "ProductX", "4/1/2015", 6],
["Company2", "ProductX", "4/2/2015", 3],
["Company2", "ProductX", "4/2/2015", 2]]),
["company", "product", "date", "nb"])
+--------+--------+--------+---+
| company| product| date| nb|
+--------+--------+--------+---+
| Acme|Product1|1/1/2000| 5|
| Acme|Product1|1/2/2000| 7|
| Acme|Product2|3/1/2000| 9|
| Acme|Product2|3/2/2000| 4|
|Company2|ProductX|4/1/2015| 6|
|Company2|ProductX|4/2/2015| 3|
|Company2|ProductX|4/2/2015| 2|
+--------+--------+--------+---+
现在使用窗口函数:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
resultDF = myDF.withColumn(
"rowNum",
F.row_number().over(Window.partitionBy("company", "product")
.orderBy("date")))
+--------+--------+--------+---+------+
| company| product| date| nb|rowNum|
+--------+--------+--------+---+------+
|Company2|ProductX|4/1/2015| 6| 1|
|Company2|ProductX|4/2/2015| 3| 2|
|Company2|ProductX|4/2/2015| 2| 3|
| Acme|Product2|3/1/2000| 9| 1|
| Acme|Product2|3/2/2000| 4| 2|
| Acme|Product1|1/1/2000| 5| 1|
| Acme|Product1|1/2/2000| 7| 2|
+--------+--------+--------+---+------+