我有一个看起来像这样的数据名声
Sno|UserID|TypeExp
1|JAS123|MOVIE
2|ASP123|GAMES
3|JAS123|CLOTHING
4|DPS123|MOVIE
5|DPS123|CLOTHING
6|ASP123|MEDICAL
7|JAS123|OTH
8|POQ133|MEDICAL
.......
10000|DPS123|OTH
用户ID是识别用户的列,typeexp列定义了该月的支出类型,现在我有5个不同的支出,即
typexplist = [电影,游戏,服装,医疗,oth]
现在,我想将其转换为用户级别的数据框架,其中具有0或1个二进制变量,以存储信息天气,而不是用户" x"完成了上述类型的支出
例如,在上面的快照数据帧输出中应该看起来像
User| TypeExpList #Type list is this array corresponding entry's [MOVIE,GAMES,CLOTHING,MEDICAL,OTH]
JAS123 |[1,0,1,0,1] #since user has done expenditure on Movie,CLOTHING,OTHER Category
ASP123 |[0,1,0,1,0] #since User expenditure on GAMES & MEDICAL
DPS123 |[1,0,1,0,1] #since user expenditure on MOVIE,CLOTHING & OTHER
POQ133 |[0,0,0,1,0] #since User Expenditure on MEDICAL only
这是您的输入数据集。
$ cat input.csv
Sno|UserID|TypeExp
1|JAS123|MOVIE
2|ASP123|GAMES
3|JAS123|CLOTHING
4|DPS123|MOVIE
5|DPS123|CLOTHING
6|ASP123|MEDICAL
7|JAS123|OTH
8|POQ133|MEDICAL
这样,您可以在UserID
上对pivot
进行CC_1。
val bins = spark
.read
.option("sep", "|")
.option("header", true)
.csv("input.csv")
.groupBy("UserID")
.pivot("TypeExp")
.count
.na
.fill(0)
scala> bins.show
+------+--------+-----+-------+-----+---+
|UserID|CLOTHING|GAMES|MEDICAL|MOVIE|OTH|
+------+--------+-----+-------+-----+---+
|POQ133| 0| 0| 1| 0| 0|
|JAS123| 1| 0| 0| 1| 1|
|DPS123| 1| 0| 0| 1| 0|
|ASP123| 0| 1| 1| 0| 0|
+------+--------+-----+-------+-----+---+
您有0
S和1
s。最后的技巧是在列上使用array
来构建适当的输出,其中位置表示支出。
val solution = bins.select(
$"UserID" as "User",
array("MOVIE","GAMES","CLOTHING","MEDICAL","OTH") as "TypeExpList")
scala> solution.show
+------+---------------+
| User| TypeExpList|
+------+---------------+
|POQ133|[0, 0, 0, 1, 0]|
|JAS123|[1, 0, 1, 0, 1]|
|DPS123|[1, 0, 1, 0, 0]|
|ASP123|[0, 1, 0, 1, 0]|
+------+---------------+
鉴于支出可能发生零,一次或多次,支出的count
可以给出0
,1
或更高的数字。
您可以使用UDF对值进行二分化,并确保仅使用0
S和1
S。
val binarizer = udf { count: Long => if (count > 0) 1 else 0 }
val binaryCols = bins
.columns
.filterNot(_ == "UserID")
.map(col)
.map(c => binarizer(c) as c.toString)
val selectCols = ($"UserID" as "User") +: binaryCols
val solution = bins
.select(selectCols: _*)
.select(
$"User",
array("MOVIE","GAMES","CLOTHING","MEDICAL","OTH") as "TypeExpList")
scala> solution.show
+------+---------------+
| User| TypeExpList|
+------+---------------+
|POQ133|[0, 0, 0, 1, 0]|
|JAS123|[1, 0, 1, 0, 1]|
|DPS123|[1, 0, 1, 0, 0]|
|ASP123|[0, 1, 0, 1, 0]|
+------+---------------+
crosstab
将完成大部分工作:
val table = df.stat.crosstab("UserID", "TypeExp")
+--------------+--------+-----+-------+-----+---+
|UserID_TypeExp|CLOTHING|GAMES|MEDICAL|MOVIE|OTH|
+--------------+--------+-----+-------+-----+---+
| ASP123| 0| 1| 1| 0| 0|
| DPS123| 1| 0| 0| 1| 0|
| JAS123| 1| 0| 0| 1| 1|
| POQ133| 0| 0| 1| 0| 0|
+--------------+--------+-----+-------+-----+---+
可以很好地与强键入API结合:
table.map(_.toSeq match {
case Seq(id: String, cnts @ _*) =>
(id, cnts.map(c => if(c != 0) 1 else 0))}).toDF("UserId", "TypeExp")
+------+---------------+
|UserId| TypeExp|
+------+---------------+
|ASP123|[0, 1, 1, 0, 0]|
|DPS123|[1, 0, 0, 1, 0]|
|JAS123|[1, 0, 0, 1, 1]|
|POQ133|[0, 0, 1, 0, 0]|
+------+---------------+
该解决方案在Scala中,但在Pyspark中也应该有些相似,因为它使用了DSL。枢轴可提供SPARK 1.6
val pivotDf = df.groupBy($"userid").pivot("typeexp").agg(count($"typeexp") )
pivotDf.show
+------+--------+-----+-------+-----+---+
|userid|CLOTHING|GAMES|MEDICAL|MOVIE|OTH|
+------+--------+-----+-------+-----+---+
|DPS123| 1| 0| 0| 1| 0|
|JAS123| 1| 0| 0| 1| 1|
|ASP123| 0| 1| 1| 0| 0|
|POQ133| 0| 0| 1| 0| 0|
+------+--------+-----+-------+-----+---+
pivotDf.selectExpr("userid", "array(movie, games, clothing, medical,oth) as TypExpList")
.show
+------+---------------+
|userid| TypExpList|
+------+---------------+
|DPS123|[1, 0, 1, 0, 0]|
|JAS123|[1, 0, 1, 0, 1]|
|ASP123|[0, 1, 0, 1, 0]|
|POQ133|[0, 0, 0, 1, 0]|
+------+---------------+