我正在使用SparkR 1.6,我有一个数百万行的数据帧。df 的列之一,名为 « 类别 »,包含具有以下模式的字符串:
categories
1 cat1,cat2,cat3
2 cat1,cat2
3 cat3, cat4
4 cat5
我想拆分每个字符串并创建« n »新列,其中« n»是可能的类别数(此处n = 5,但实际上可能超过50)。
每个新列将包含一个布尔值,用于表示类别的存在/不存在,例如:
cat1 cat2 cat3 cat4 cat5
1 TRUE TRUE TRUE FALSE FALSE
2 TRUE TRUE FALSE FALSE FALSE
3 FALSE FALSE TRUE TRUE FALSE
4 FALSE FALSE FALSE FALSE TRUE
如何仅使用 sparkR API 执行此操作?
谢谢你的时间。
问候。
让我们从导入和虚拟数据开始:
library(magrittr)
df <- createDataFrame(sqlContext, data.frame(
categories=c("cat1,cat2,cat3", "cat1,cat2", "cat3,cat4", "cat5")
))
单独的字符串:
separated <- selectExpr(df, "split(categories, ',') AS categories")
获取不同的类别:
categories <- select(separated, explode(separated$categories)) %>%
distinct() %>%
collect() %>%
extract2(1)
构建表达式列表:
exprs <- lapply(
categories, function(x)
alias(array_contains(separated$categories, x), x)
)
选择并检查结果
select(separated, exprs) %>% head()
## cat1 cat2 cat3 cat4 cat5
## 1 TRUE TRUE TRUE FALSE FALSE
## 2 TRUE TRUE FALSE FALSE FALSE
## 3 FALSE FALSE TRUE TRUE FALSE
## 4 FALSE FALSE FALSE FALSE TRUE
不使用SparkR::collect()
的纯Spark解决方案。如果给定 Spark 数据帧的列具有一定数量的分隔符,以下是我的解决方案,具有以下假设:
# separator = '::'
# number of separators = 3
# name of the respective column = col
首先,您必须使用拆分列创建输出数据帧的架构:
AddFieldsToSchema = function(existingSchema, newFieldNames, newFieldTypes) {
# This somewhat tortured syntax is necessary because the existingSchema
# variable is actually a Java object under the hood
existingNames = unlist(lapply(existingSchema$fields(), function(field) {
field$name()
}))
existingTypes = unlist(lapply(existingSchema$fields(), function(field) {
field$dataType.simpleString()
}))
combinedNames = c(existingNames, newFieldNames)
combinedTypes = c(existingTypes, newFieldTypes)
return(CreateSchema(combinedNames, combinedTypes))
}
num_separator = 3
sdf_schema = SparkR::schema(sdf) %>%
AddFieldsToSchema(paste0('col_', seq(1, num_separator)),
c(rep('string', num_separator)))
然后,您将在 SparkR::d apply 中使用的给定列的拆分函数:
my_func = function(x) {cbind(x, stringr::str_split_fixed(x$col, '::', 3))}
sdf_split = sdf %>%
SparkR::dapply(my_func, df_schema)