r-如何过滤SparkR数据帧



在SparkR中,我有一个DataFrame data,它包含每个动作的useracttimeact包含从1到9的数字,这意味着我们有9个动作。

head(data)

然后给出

user  act  time
21      1  2012-01-05
14      8  2013-05-04
21      1  2013-01-04
84      4  2012-02-02

对于每个user,我想获得前60天的所有act

例如用户21 filter(data, data$user==21)

有这些行为吗

user   act   time
21     1     2012-01-05
21     1     2013-01-04
21     7     2013-01-05

在这里,我只想要第一幕,因为其他两幕已经60多天了。

我可以通过这个代码找到每个用户的出生(第一次出现act

userbirth <- groupBy(data, data$user) %>% agg(min(data$time))

但我不知道如何为每个CCD_ 11获得仅包含前60天的CCD_。

我试着用这种方法解决问题

g <- groupBy(data, data$user)
result <- agg(g, data$time < min(data$time) +60 )

但是R给了我一条错误消息:"returnstatus==0不是True。"我该如何解决这个问题?

如果您对SparkR之外但在R和Spark内部的答案持开放态度,则可以使用dplyr.Spark.hive,dplyr的后端。它们支持窗口功能,这正是您所需要的。下面是mtcars数据集的一个例子

> group_by(mtcars, carb)  %>% filter(mpg <= min(mpg) + 5)  %>% arrange(carb, mpg)
Source: local data frame [18 x 11]
Groups: carb [6]
     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
   (dbl) (dbl) (dbl) (dbl) (dbl) (dbl) (dbl) (dbl) (dbl) (dbl) (dbl)
1   18.1     6 225.0   105  2.76 3.460 20.22     1     0     3     1
2   21.4     6 258.0   110  3.08 3.215 19.44     1     0     3     1
3   21.5     4 120.1    97  3.70 2.465 20.01     1     0     3     1
4   22.8     4 108.0    93  3.85 2.320 18.61     1     1     4     1
5   15.2     8 304.0   150  3.15 3.435 17.30     0     0     3     2
6   15.5     8 318.0   150  2.76 3.520 16.87     0     0     3     2
7   18.7     8 360.0   175  3.15 3.440 17.02     0     0     3     2
8   19.2     8 400.0   175  3.08 3.845 17.05     0     0     3     2
9   15.2     8 275.8   180  3.07 3.780 18.00     0     0     3     3
10  16.4     8 275.8   180  3.07 4.070 17.40     0     0     3     3
11  17.3     8 275.8   180  3.07 3.730 17.60     0     0     3     3
12  10.4     8 472.0   205  2.93 5.250 17.98     0     0     3     4
13  10.4     8 460.0   215  3.00 5.424 17.82     0     0     3     4
14  13.3     8 350.0   245  3.73 3.840 15.41     0     0     3     4
15  14.3     8 360.0   245  3.21 3.570 15.84     0     0     3     4
16  14.7     8 440.0   230  3.23 5.345 17.42     0     0     3     4
17  19.7     6 145.0   175  3.62 2.770 15.50     0     1     5     6
18  15.0     8 301.0   335  3.54 3.570 14.60     0     1     5     8

在每一组化油器数量相同的汽车中,我们选择所有油耗在每组最低值5英里/加仑以内的汽车。

您与userbirth关系密切;您只需要使用join将这个新的min(time)列包含到您的初始DF中。

这里有一个完全可复制的例子,在您显示的记录中添加更多的记录,以便获得清晰的演示:

library(magrittr)
user <- c(21, 14, 21, 84, 21, 21, 14, 14)
act <- c(1, 8, 1, 4, 7, 9, 1, 3)
time <- c("2012-01-05", "2013-05-04", "2013-01-04", "2012-02-02", "2013-01-05", "2012-02-10", "2013-05-20", "2013-07-10")
df_local <- data.frame(user, act, time)
df_local
#   user act       time
# 1   21   1 2012-01-05
# 2   14   8 2013-05-04
# 3   21   1 2013-01-04
# 4   84   4 2012-02-02
# 5   21   7 2013-01-05
# 6   21   9 2012-02-10
# 7   14   1 2013-05-20
# 8   14   3 2013-07-10
df <- createDataFrame(sqlContext, df_local)
df$time <- to_date(df$time)
df$user <- cast(df$user, "integer")
df$act <- cast(df$act, "integer")
df
# DataFrame[user:int, act:int, time:date]
userbirth <- groupBy(df, df$user) %>% agg(min(df$time))
names(userbirth) <- c("user_", "min_time")  # works, although undocumented!
userbirth
# DataFrame[user_:int, min_time:date]
showDF(userbirth)
# +-----+----------+
# |user_|  min_time|
# +-----+----------+
# |   84|2012-02-02|
# |   14|2013-05-04|
# |   21|2012-01-05|
# +-----+----------+    
df2 <- join(df, userbirth, df$user == userbirth$user_) 
showDF(df2)
# +----+---+----------+-----+----------+
# |user|act|      time|user_|  min_time|
# +----+---+----------+-----+----------+
# |  84|  4|2012-02-02|   84|2012-02-02|
# |  14|  8|2013-05-04|   14|2013-05-04|
# |  14|  1|2013-05-20|   14|2013-05-04|
# |  14|  3|2013-07-10|   14|2013-05-04|
# |  21|  1|2012-01-05|   21|2012-01-05|
# |  21|  1|2013-01-04|   21|2012-01-05|
# |  21|  7|2013-01-05|   21|2012-01-05|
# |  21|  9|2012-02-10|   21|2012-01-05|
# +----+---+----------+-----+----------+

在继续之前,让我们根据上面的df2数据来检查预期结果:

  • 用户84的唯一记录
  • 用户14 2013年5月的两条记录
  • 用户21 2012年的两项记录

让我们看看(我们使用SparkR date_add函数):

df3 <- filter(df2, df2$time <= date_add(df2$min_time, 60))
showDF(df3)
# +----+---+----------+-----+----------+
# |user|act|      time|user_|  min_time|
# +----+---+----------+-----+----------+
# |  84|  4|2012-02-02|   84|2012-02-02|
# |  14|  8|2013-05-04|   14|2013-05-04|
# |  14|  1|2013-05-20|   14|2013-05-04|
# |  21|  1|2012-01-05|   21|2012-01-05|
# |  21|  9|2012-02-10|   21|2012-01-05|
# +----+---+----------+-----+----------+

从这一点来看,我们可以只保留选定的列,或多或少就像我们在普通R数据帧中所做的那样:

df4 <- df3[,c("user", "act","time")]
showDF(df4)
# +----+---+----------+
# |user|act|      time|
# +----+---+----------+
# |  84|  4|2012-02-02|
# |  14|  8|2013-05-04|
# |  14|  1|2013-05-20|
# |  21|  1|2012-01-05|
# |  21|  9|2012-02-10|
# +----+---+----------+

注意,在创建Spark数据帧df之后,所有操作都是SparkR操作(而不是"本地"R):

class(df4)
# [1] "DataFrame"
# attr(,"package")
# [1] "SparkR"
df4
# DataFrame[user:int, act:int, time:date]

请随时回复您可能需要的任何澄清。。。

sessionInfo()
# R version 3.2.2 (2015-08-14)
# Platform: i686-pc-linux-gnu (32-bit)
# Running under: Ubuntu 14.04.2 LTS
# [...]
# other attached packages:
# [1] magrittr_1.5 SparkR_1.5.1

相关内容

  • 没有找到相关文章

最新更新