我在EMR 4.3上使用Spark 1.6来查询属于hive metastore中一个表的约15TB的数据(由S3中的gzip parquet文件支持)。对于我的集群,我有一个r3.8xlarge主节点和15个r3.8xlarge核心节点(3.6TB RAM, 9.6TB SSD)。
约15TB的数据包含在大约90亿行中。每行有~15列存储长度为5-50的字符串,还有一列包含~30个字符串的数组,每个字符串包含10-20个字符。数组中只存储了~ 100万个唯一字符串。我所要做的就是计算数组列中的唯一字符串,但似乎我正在耗尽内存,因为我不断得到:OutOfMemoryError:无法在执行器上创建新的本机线程。由于内存不足导致任务失败,执行器被禁用,然后作业失败。
它工作时,我查询5-10TB的数据。我一定不能正确地理解存储在内存中的东西(这就是我想弄清楚的)。顺便说一句,对于上面的集群,我设置:
spark.executor.memory 30g
spark.executor.cores 5
spark.executor.instances 90 // 6 instances per r3.8xlarge host
我不认为Spark SQL将中间表存储在内存中。由于唯一字符串的个数不超过1M,所以我认为包含其计数的字符串应该可以很容易地放入内存中。下面是查询:
val initial_df = sqlContext.sql("select unique_strings_col from Table where timestamp_partition between '2016-09-20T07:00:00Z' and '2016-09-23T07:00:00Z'")
initial_df.registerTempTable("initial_table") // ~15TB compressed data to read in from S3
val unique_strings_df = sqlContext.sql("select posexplode(unique_strings_col) as (string_pos, string) from initial_table").select($"string_pos", $"string")
unique_strings_df.registerTempTable("unique_strings_table") // ~70% initial data remaining at this point
val strings_count_df = sqlContext.sql("select string, count(*) as unique_string_count from unique_strings_table where string_pos < 21 group by string order by unique_string_count desc") // ~50% initial data remaining at this point
strings_count_df.write.parquet("s3://mybucket/counts/2016-09-20-2016-09-23")
压缩的拼花文件很小(比如每个5mb)。似乎它们可以一次读取一个,过滤,并与它们的计数一起存储。我错过了什么?
所以事实证明,我需要有足够的磁盘和内存空间来存储初始RDD。如果在创建临时表之前在初始RDD中进行更多的预先过滤,就能够成功运行查询。耶!