如何用多线程和并行方式连接、分组和总结R中的大型数据帧



这个问题与R中数据非常大的其他问题类似,但我找不到如何合并/联接然后对两个dfs执行计算的示例(而不是读取大量数据帧并使用mclapply进行计算(。这里的问题不是加载数据(大约需要20分钟,但它们确实加载了(,而是合并&总结。

我已经尝试了我能找到的所有data.table方法,不同类型的联接和ff,但我仍然遇到vecseq限制2^31行的问题。现在我正在尝试使用multiplyer并行执行,但无法找出错误的来源。

数据帧:species_data#df,约6500万行,列<-c("id","species_id"(查找#df,约1700万行,列<-c("id"、"cell_id"、"rgn_id"(并非查找中的所有ID都显示在species_data 中

## make sample dataframes:
lookup <- data.frame(id = seq(2001,2500, by = 1), 
cell_id = seq(1,500, by = 1), 
rgn_id = seq(801,1300, by = 1))
library(stringi)
species_id <- sprintf("%s%s%s", stri_rand_strings(n = 1000, length = 3, pattern = "[A-Z]"),
pattern = "-",
stri_rand_strings(1000, length = 5, '[1-9]'))
id <- sprintf("%s%s%s", stri_rand_strings(n = 1000, length = 1, pattern = "[2]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-4]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-9]"))
species_data <- data.frame(species_id, id)

使用多层合并和加入dfs

library(tidyverse)
install.packages("devtools")
library(devtools)
devtools::install_github("hadley/multidplyr") 
library(multidplyr)
library(parallel)
species_summary <- species_data %>%
# partition the species data by species id
partition(species_id, cluster = cluster) %>%
left_join(species_data, lookup, by = "id") %>%
dplyr::select(-id) %>%
group_by(species_id) %>%
## total number of cells each species occurs in
mutate(tot_count_cells = n_distinct(cell_id)) %>%
ungroup() %>%
dplyr::select(c(cell_id, species_id, rgn_id, tot_count_cells)) %>%
group_by(rgn_id, species_id) %>% 
## number of cells each species occurs in each region
summarise(count_cells_eez = n_distinct(cell_id)) %>% 
collect() %>%
as_tibble()
## Error in partition(., species_id, cluster = cluster) : unused argument (species_id)
## If I change to:
species_summary <- species_data %>%
group_by(species_id) %>%
partition(cluster = cluster) %>% ...
## get, "Error in worker_id(data, cluster) : object 'cluster' not found

这是我第一次尝试并行和大数据,我正在努力诊断错误。

谢谢!

第一次I加载dplyr和多层

library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(multidplyr)
my_clusters <- new_cluster(3) # I have 4 cores

然后我加载与你建议的相同的数据

library(stringi)
lookup <- tibble(
id = as.character(seq(2001, 2500, by = 1)),
cell_id = seq(1, 500, by = 1),
rgn_id = sprintf("%s", stri_rand_strings(n = 500, length = 3, pattern = "[0-9]"))
)
species_id <- sprintf(
"%s%s%s", 
stri_rand_strings(n = 1000, length = 3, pattern = "[A-Z]"),
pattern = "-",
stri_rand_strings(n = 1000, length = 5, "[1-9]")
)
id <- sprintf(
"%s%s%s", 
stri_rand_strings(n = 1000, length = 1, pattern = "[2]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-4]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-9]")
)
species_data <- tibble(species_id, id)

检查结果

species_data
#> # A tibble: 1,000 x 2
#>    species_id id   
#>    <chr>      <chr>
#>  1 CUZ-98293  246  
#>  2 XDG-61673  234  
#>  3 WFZ-94338  230  
#>  4 UIH-97549  226  
#>  5 AGE-35257  229  
#>  6 BMD-75361  249  
#>  7 MJB-78799  226  
#>  8 STS-15141  225  
#>  9 RXD-18645  245  
#> 10 SKZ-58666  243  
#> # ... with 990 more rows
lookup
#> # A tibble: 500 x 3
#>    id    cell_id rgn_id
#>    <chr>   <dbl> <chr> 
#>  1 2001        1 649   
#>  2 2002        2 451   
#>  3 2003        3 532   
#>  4 2004        4 339   
#>  5 2005        5 062   
#>  6 2006        6 329   
#>  7 2007        7 953   
#>  8 2008        8 075   
#>  9 2009        9 008   
#> 10 2010       10 465   
#> # ... with 490 more rows

现在我可以使用多线程方法运行代码了。我根据的两个分组将dplyr代码分为两步

first_step <- species_data %>% 
left_join(lookup, by = "id") %>% 
select(-id) %>% 
group_by(species_id) %>% 
partition(my_clusters) %>% 
mutate(tot_count_cells = n_distinct(cell_id)) %>% 
collect() %>% 
ungroup()
first_step
#> # A tibble: 1,000 x 4
#>    species_id cell_id rgn_id tot_count_cells
#>    <chr>        <dbl> <chr>            <int>
#>  1 UIH-97549       NA <NA>                 1
#>  2 BMD-75361       NA <NA>                 1
#>  3 STS-15141       NA <NA>                 1
#>  4 RXD-18645       NA <NA>                 1
#>  5 HFI-78676       NA <NA>                 1
#>  6 KVP-45194       NA <NA>                 1
#>  7 SGW-29988       NA <NA>                 1
#>  8 WBI-79521       NA <NA>                 1
#>  9 MFY-86277       NA <NA>                 1
#> 10 BHO-37621       NA <NA>                 1
#> # ... with 990 more rows

second_step <- first_step %>% 
group_by(rgn_id, species_id) %>% 
partition(my_clusters) %>% 
summarise(count_cells_eez = n_distinct(cell_id)) %>% 
collect() %>% 
ungroup()
second_step
#> # A tibble: 1,000 x 3
#>    rgn_id species_id count_cells_eez
#>    <chr>  <chr>                <int>
#>  1 <NA>   ABB-24645                1
#>  2 <NA>   ABY-98559                1
#>  3 <NA>   AEQ-42462                1
#>  4 <NA>   AFO-58569                1
#>  5 <NA>   AKQ-44439                1
#>  6 <NA>   AMF-23978                1
#>  7 <NA>   ANF-49159                1
#>  8 <NA>   APD-85367                1
#>  9 <NA>   AQH-64126                1
#> 10 <NA>   AST-77513                1
#> # ... with 990 more rows

由reprex包(v0.3.0(于2020-03-21创建

相关内容

  • 没有找到相关文章

最新更新