在SparkR中,我有一个DataFrame data
,它包含每个动作的user
、act
和time
。act
包含从1到9的数字,这意味着我们有9个动作。
head(data)
然后给出
user act time
21 1 2012-01-05
14 8 2013-05-04
21 1 2013-01-04
84 4 2012-02-02
对于每个user
,我想获得前60天的所有act
。
例如用户21 filter(data, data$user==21)
有这些行为吗
user act time
21 1 2012-01-05
21 1 2013-01-04
21 7 2013-01-05
在这里,我只想要第一幕,因为其他两幕已经60多天了。
我可以通过这个代码找到每个用户的出生(第一次出现act
)
userbirth <- groupBy(data, data$user) %>% agg(min(data$time))
但我不知道如何为每个CCD_ 11获得仅包含前60天的CCD_。
我试着用这种方法解决问题
g <- groupBy(data, data$user)
result <- agg(g, data$time < min(data$time) +60 )
但是R给了我一条错误消息:"returnstatus==0不是True。"我该如何解决这个问题?
如果您对SparkR之外但在R和Spark内部的答案持开放态度,则可以使用dplyr.Spark.hive,dplyr的后端。它们支持窗口功能,这正是您所需要的。下面是mtcars数据集的一个例子
> group_by(mtcars, carb) %>% filter(mpg <= min(mpg) + 5) %>% arrange(carb, mpg)
Source: local data frame [18 x 11]
Groups: carb [6]
mpg cyl disp hp drat wt qsec vs am gear carb
(dbl) (dbl) (dbl) (dbl) (dbl) (dbl) (dbl) (dbl) (dbl) (dbl) (dbl)
1 18.1 6 225.0 105 2.76 3.460 20.22 1 0 3 1
2 21.4 6 258.0 110 3.08 3.215 19.44 1 0 3 1
3 21.5 4 120.1 97 3.70 2.465 20.01 1 0 3 1
4 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
5 15.2 8 304.0 150 3.15 3.435 17.30 0 0 3 2
6 15.5 8 318.0 150 2.76 3.520 16.87 0 0 3 2
7 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
8 19.2 8 400.0 175 3.08 3.845 17.05 0 0 3 2
9 15.2 8 275.8 180 3.07 3.780 18.00 0 0 3 3
10 16.4 8 275.8 180 3.07 4.070 17.40 0 0 3 3
11 17.3 8 275.8 180 3.07 3.730 17.60 0 0 3 3
12 10.4 8 472.0 205 2.93 5.250 17.98 0 0 3 4
13 10.4 8 460.0 215 3.00 5.424 17.82 0 0 3 4
14 13.3 8 350.0 245 3.73 3.840 15.41 0 0 3 4
15 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
16 14.7 8 440.0 230 3.23 5.345 17.42 0 0 3 4
17 19.7 6 145.0 175 3.62 2.770 15.50 0 1 5 6
18 15.0 8 301.0 335 3.54 3.570 14.60 0 1 5 8
在每一组化油器数量相同的汽车中,我们选择所有油耗在每组最低值5英里/加仑以内的汽车。
您与userbirth
关系密切;您只需要使用join
将这个新的min(time)
列包含到您的初始DF中。
这里有一个完全可复制的例子,在您显示的记录中添加更多的记录,以便获得清晰的演示:
library(magrittr)
user <- c(21, 14, 21, 84, 21, 21, 14, 14)
act <- c(1, 8, 1, 4, 7, 9, 1, 3)
time <- c("2012-01-05", "2013-05-04", "2013-01-04", "2012-02-02", "2013-01-05", "2012-02-10", "2013-05-20", "2013-07-10")
df_local <- data.frame(user, act, time)
df_local
# user act time
# 1 21 1 2012-01-05
# 2 14 8 2013-05-04
# 3 21 1 2013-01-04
# 4 84 4 2012-02-02
# 5 21 7 2013-01-05
# 6 21 9 2012-02-10
# 7 14 1 2013-05-20
# 8 14 3 2013-07-10
df <- createDataFrame(sqlContext, df_local)
df$time <- to_date(df$time)
df$user <- cast(df$user, "integer")
df$act <- cast(df$act, "integer")
df
# DataFrame[user:int, act:int, time:date]
userbirth <- groupBy(df, df$user) %>% agg(min(df$time))
names(userbirth) <- c("user_", "min_time") # works, although undocumented!
userbirth
# DataFrame[user_:int, min_time:date]
showDF(userbirth)
# +-----+----------+
# |user_| min_time|
# +-----+----------+
# | 84|2012-02-02|
# | 14|2013-05-04|
# | 21|2012-01-05|
# +-----+----------+
df2 <- join(df, userbirth, df$user == userbirth$user_)
showDF(df2)
# +----+---+----------+-----+----------+
# |user|act| time|user_| min_time|
# +----+---+----------+-----+----------+
# | 84| 4|2012-02-02| 84|2012-02-02|
# | 14| 8|2013-05-04| 14|2013-05-04|
# | 14| 1|2013-05-20| 14|2013-05-04|
# | 14| 3|2013-07-10| 14|2013-05-04|
# | 21| 1|2012-01-05| 21|2012-01-05|
# | 21| 1|2013-01-04| 21|2012-01-05|
# | 21| 7|2013-01-05| 21|2012-01-05|
# | 21| 9|2012-02-10| 21|2012-01-05|
# +----+---+----------+-----+----------+
在继续之前,让我们根据上面的df2
数据来检查预期结果:
- 用户
84
的唯一记录 - 用户
14
2013年5月的两条记录 - 用户
21
2012年的两项记录
让我们看看(我们使用SparkR date_add
函数):
df3 <- filter(df2, df2$time <= date_add(df2$min_time, 60))
showDF(df3)
# +----+---+----------+-----+----------+
# |user|act| time|user_| min_time|
# +----+---+----------+-----+----------+
# | 84| 4|2012-02-02| 84|2012-02-02|
# | 14| 8|2013-05-04| 14|2013-05-04|
# | 14| 1|2013-05-20| 14|2013-05-04|
# | 21| 1|2012-01-05| 21|2012-01-05|
# | 21| 9|2012-02-10| 21|2012-01-05|
# +----+---+----------+-----+----------+
从这一点来看,我们可以只保留选定的列,或多或少就像我们在普通R数据帧中所做的那样:
df4 <- df3[,c("user", "act","time")]
showDF(df4)
# +----+---+----------+
# |user|act| time|
# +----+---+----------+
# | 84| 4|2012-02-02|
# | 14| 8|2013-05-04|
# | 14| 1|2013-05-20|
# | 21| 1|2012-01-05|
# | 21| 9|2012-02-10|
# +----+---+----------+
注意,在创建Spark数据帧df
之后,所有操作都是SparkR操作(而不是"本地"R):
class(df4)
# [1] "DataFrame"
# attr(,"package")
# [1] "SparkR"
df4
# DataFrame[user:int, act:int, time:date]
请随时回复您可能需要的任何澄清。。。
sessionInfo()
# R version 3.2.2 (2015-08-14)
# Platform: i686-pc-linux-gnu (32-bit)
# Running under: Ubuntu 14.04.2 LTS
# [...]
# other attached packages:
# [1] magrittr_1.5 SparkR_1.5.1