从 Spark 群集收集数据时出现内存不足错误



我知道SO上有很多关于Spark内存不足错误的问题,但我还没有找到我的解决方案。

我有一个简单的工作流程:

  1. 从 Amazon S3 读取 ORC 文件
  2. filter到一小部分行
  3. select列的一小部分
  4. collect到驱动程序节点(所以我可以在R中执行其他操作(

当我运行上述内容然后cache表以激发内存时,它占用 <2GB - 与我的集群可用的内存相比很小 - 然后当我尝试将数据collect到我的驱动程序节点时出现 OOM 错误。

我尝试在以下设置上运行:

  • 具有 32 个内核和 244GB RAM 的计算机上的本地模式
  • 具有 10 个 6.2 GB 执行程序和一个 61GB 驱动程序节点的独立模式

对于其中的每一个,我都使用了许多executor.memorydriver.memorydriver.maxResultSize的配置,以涵盖我可用内存中所有可能的值,但总是在collect阶段出现内存不足错误;java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError : GC overhead limit exceededError in invoke_method.spark_shell_connection(spark_connection(jobj), : No status is returned.(指示内存问题的sparklyr错误(。

根据我对 Spark 的 [有限] 理解,在收集之前缓存表应该强制所有计算 - 即,如果表在缓存 <2GB 后愉快地坐在内存中,那么我应该不需要超过 2GB 的内存将其收集到驱动程序节点中。

请注意,这个问题的答案有一些我还没有尝试的建议,但这些可能会影响性能(例如序列化RDD(,所以如果可能的话,希望避免使用。

我的问题:

  1. 缓存后占用如此少空间的数据帧如何会导致内存问题?
  2. 在我继续使用可能影响性能的其他选项之前,我是否需要检查/更改/故障排除以帮助解决问题?

谢谢

编辑:请注意,为了回应@Shaido下面的评论,通过 Sparklyr 调用cache"通过在表上执行count(*)来强制将数据加载到内存中"[来自 Sparklyr 文档] - 即表应该坐在内存中并且所有计算在调用collect之前运行(我相信(。

编辑:自遵循以下建议以来的一些其他意见:

  • 根据下面的评论,我现在尝试将数据写入csv,而不是收集以了解可能的文件大小。此操作将创建一组总计 ~3GB 的 csvs,缓存后运行时只需 2 秒。
  • 如果我将driver.maxResultSize设置为 <1G,则会收到一个错误,指出序列化 RDD 的大小为 1030 MB,大于 driver.maxResultSize。
  • 如果我在调用collect后在任务管理器中观察内存使用情况,我看到使用情况一直在上升,直到达到~90GB,此时发生OOM错误。因此,无论出于何种原因,用于执行collect操作的 RAM 量都比我尝试收集的 RDD 大小大 ~100 倍

编辑:根据注释中的要求在下面添加的代码。

#__________________________________________________________________________________________________________________________________
# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________
firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'
library(dplyr)
library(stringr)
library(sparklyr)
#__________________________________________________________________________________________________________________________________
# Configure & connect to spark
#__________________________________________________________________________________________________________________________________
Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop") 
config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions
# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g' 
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')
#__________________________________________________________________________________________________________________________________
# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________
#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++
spark_session(sc) %>%
invoke("read") %>% 
invoke("format", "orc") %>%
invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>% 
invoke("createOrReplaceTempView", "alldatadf") 
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory
#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++
# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1
# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2) 
# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7) 
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)
#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++
# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory
# filter by month and year, using ORC partitions for extra speed
filter(((date_year==year_y1  & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
(date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
(date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
(date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%
# filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%
# filter by advertiser ID
filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) & 
!is.na(advertiser_id)) |
((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 | 
floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%
# Define cols to keep
transmute(time=as.numeric(event_time/1000000),
user_id=as.character(user_id),
action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
activity_lookup=as.character(activity_id),
sv1=as.character(segment_value_1),
other_data=as.character(other_data))  %>%
mutate(time_char=as.character(from_unixtime(time)))
# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")
#__________________________________________________________________________________________________________________________________
# Collect out of spark
#__________________________________________________________________________________________________________________________________
myDF <- collect(dftbl)

当您说在数据帧上收集时,会发生两件事,

  1. 首先,所有数据都必须写入驱动程序上的输出。
  2. 驱动程序必须从所有节点收集数据并保留在其内存中。

答:

如果您希望仅将数据加载到执行器的内存中,count(( 也是一个操作,它将数据加载到执行器的内存中,该内存可供其他进程使用。

如果要提取数据,请在处理数据"--conf spark.driver.maxResultSize=10g"时尝试此操作以及其他属性。

如上所述,"缓存"不是操作,请检查 RDD 持久性:

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. 
但是"collect">

是一个动作,所有计算(包括"缓存"(都将在调用"collect"时启动。

您在独立模式下运行应用程序,这意味着初始数据加载和所有计算将在同一内存中执行。

数据下载和其他计算使用最多的内存,而不是"收集"。

您可以通过将">

收集"替换为"计数"来检查它。