这个问题与R中数据非常大的其他问题类似,但我找不到如何合并/联接然后对两个dfs执行计算的示例(而不是读取大量数据帧并使用mclapply进行计算(。这里的问题不是加载数据(大约需要20分钟,但它们确实加载了(,而是合并&总结。
我已经尝试了我能找到的所有data.table方法,不同类型的联接和ff,但我仍然遇到vecseq限制2^31行的问题。现在我正在尝试使用multiplyer并行执行,但无法找出错误的来源。
数据帧:species_data#df,约6500万行,列<-c("id","species_id"(查找#df,约1700万行,列<-c("id"、"cell_id"、"rgn_id"(并非查找中的所有ID都显示在species_data 中
## make sample dataframes:
lookup <- data.frame(id = seq(2001,2500, by = 1),
cell_id = seq(1,500, by = 1),
rgn_id = seq(801,1300, by = 1))
library(stringi)
species_id <- sprintf("%s%s%s", stri_rand_strings(n = 1000, length = 3, pattern = "[A-Z]"),
pattern = "-",
stri_rand_strings(1000, length = 5, '[1-9]'))
id <- sprintf("%s%s%s", stri_rand_strings(n = 1000, length = 1, pattern = "[2]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-4]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-9]"))
species_data <- data.frame(species_id, id)
使用多层合并和加入dfs
library(tidyverse)
install.packages("devtools")
library(devtools)
devtools::install_github("hadley/multidplyr")
library(multidplyr)
library(parallel)
species_summary <- species_data %>%
# partition the species data by species id
partition(species_id, cluster = cluster) %>%
left_join(species_data, lookup, by = "id") %>%
dplyr::select(-id) %>%
group_by(species_id) %>%
## total number of cells each species occurs in
mutate(tot_count_cells = n_distinct(cell_id)) %>%
ungroup() %>%
dplyr::select(c(cell_id, species_id, rgn_id, tot_count_cells)) %>%
group_by(rgn_id, species_id) %>%
## number of cells each species occurs in each region
summarise(count_cells_eez = n_distinct(cell_id)) %>%
collect() %>%
as_tibble()
## Error in partition(., species_id, cluster = cluster) : unused argument (species_id)
## If I change to:
species_summary <- species_data %>%
group_by(species_id) %>%
partition(cluster = cluster) %>% ...
## get, "Error in worker_id(data, cluster) : object 'cluster' not found
这是我第一次尝试并行和大数据,我正在努力诊断错误。
谢谢!
第一次I加载dplyr和多层
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(multidplyr)
my_clusters <- new_cluster(3) # I have 4 cores
然后我加载与你建议的相同的数据
library(stringi)
lookup <- tibble(
id = as.character(seq(2001, 2500, by = 1)),
cell_id = seq(1, 500, by = 1),
rgn_id = sprintf("%s", stri_rand_strings(n = 500, length = 3, pattern = "[0-9]"))
)
species_id <- sprintf(
"%s%s%s",
stri_rand_strings(n = 1000, length = 3, pattern = "[A-Z]"),
pattern = "-",
stri_rand_strings(n = 1000, length = 5, "[1-9]")
)
id <- sprintf(
"%s%s%s",
stri_rand_strings(n = 1000, length = 1, pattern = "[2]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-4]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-9]")
)
species_data <- tibble(species_id, id)
检查结果
species_data
#> # A tibble: 1,000 x 2
#> species_id id
#> <chr> <chr>
#> 1 CUZ-98293 246
#> 2 XDG-61673 234
#> 3 WFZ-94338 230
#> 4 UIH-97549 226
#> 5 AGE-35257 229
#> 6 BMD-75361 249
#> 7 MJB-78799 226
#> 8 STS-15141 225
#> 9 RXD-18645 245
#> 10 SKZ-58666 243
#> # ... with 990 more rows
lookup
#> # A tibble: 500 x 3
#> id cell_id rgn_id
#> <chr> <dbl> <chr>
#> 1 2001 1 649
#> 2 2002 2 451
#> 3 2003 3 532
#> 4 2004 4 339
#> 5 2005 5 062
#> 6 2006 6 329
#> 7 2007 7 953
#> 8 2008 8 075
#> 9 2009 9 008
#> 10 2010 10 465
#> # ... with 490 more rows
现在我可以使用多线程方法运行代码了。我根据的两个分组将dplyr代码分为两步
first_step <- species_data %>%
left_join(lookup, by = "id") %>%
select(-id) %>%
group_by(species_id) %>%
partition(my_clusters) %>%
mutate(tot_count_cells = n_distinct(cell_id)) %>%
collect() %>%
ungroup()
first_step
#> # A tibble: 1,000 x 4
#> species_id cell_id rgn_id tot_count_cells
#> <chr> <dbl> <chr> <int>
#> 1 UIH-97549 NA <NA> 1
#> 2 BMD-75361 NA <NA> 1
#> 3 STS-15141 NA <NA> 1
#> 4 RXD-18645 NA <NA> 1
#> 5 HFI-78676 NA <NA> 1
#> 6 KVP-45194 NA <NA> 1
#> 7 SGW-29988 NA <NA> 1
#> 8 WBI-79521 NA <NA> 1
#> 9 MFY-86277 NA <NA> 1
#> 10 BHO-37621 NA <NA> 1
#> # ... with 990 more rows
和
second_step <- first_step %>%
group_by(rgn_id, species_id) %>%
partition(my_clusters) %>%
summarise(count_cells_eez = n_distinct(cell_id)) %>%
collect() %>%
ungroup()
second_step
#> # A tibble: 1,000 x 3
#> rgn_id species_id count_cells_eez
#> <chr> <chr> <int>
#> 1 <NA> ABB-24645 1
#> 2 <NA> ABY-98559 1
#> 3 <NA> AEQ-42462 1
#> 4 <NA> AFO-58569 1
#> 5 <NA> AKQ-44439 1
#> 6 <NA> AMF-23978 1
#> 7 <NA> ANF-49159 1
#> 8 <NA> APD-85367 1
#> 9 <NA> AQH-64126 1
#> 10 <NA> AST-77513 1
#> # ... with 990 more rows
由reprex包(v0.3.0(于2020-03-21创建