我有一个大约有10亿个数据点的数据集。我想从中提取大约4600万个数据点。
我想使用Hadoop来提取唯一值,但在Hadoop上不断得到"内存不足"和Java堆大小错误-与此同时,我能够使用Python Set (hashtable,如果你愿意的话)在单个机器上相当容易地运行这个
我正在使用一个相当简单的算法来提取这些唯一的值:我正在解析我的地图中的10亿行,并输出如下所示的行:
UniqValueCount:I a
UniqValueCount:I a
UniqValueCount:I b
UniqValueCount:I c
UniqValueCount:I c
UniqValueCount:I d
,然后运行"aggregate"reducer来获得结果,对于上面的数据集应该是这样的:
I 4
这对于一个小的值集运行得很好,但是当我对10亿个数据点(有4600万个键,正如我提到的)运行它时,工作失败了。
我在Amazon的Elastic Map Reduce上运行这个任务,即使我使用6个m2.4 × large节点(每个节点的最大内存节点为68.4 GB),该任务也会因为"内存不足"错误而失败。
但是我能够使用Python代码在单个m1上使用Set数据结构(哈希表)提取唯一值。大的(一个小得多的盒子,8 GB内存)。我很困惑,Hadoop作业失败了,因为4600万个唯一不应该占用那么多内存。
出了什么问题?我是否使用了UniqValueCount错误?
您可能在shuffle中得到内存错误,请记住Hadoop在启动reducer之前对键进行排序。对于大多数应用程序来说,排序本身并不是必需的,但是Hadoop使用它来聚合属于一个键的所有值。
对于您的示例,您的映射器最终会多次写入相同的值,而您只关心给定键有多少惟一值。这是你现在正在做的:
Mapper output:
I -> a
I -> a
I -> a
I -> a
I -> b
I -> a
I -> b
Reducer input:
I -> [a, a, a, a, b, a, b]
Reducer output:
I -> 2
但在这种情况下,你真的不需要写5*a或2*b,一次就足够了,因为你只关心唯一。因此,您可以通过确保每个值只发送一次,而不是计算reducer中的惟一值,从而直接减少大量开销:
Mapper output:
I -> a
I -> b
Reducer input:
I -> [a, b]
Reducer output:
I -> 2
这将有效地减少网络带宽,并且洗牌将更简单,因为需要排序的键更少。
你可以这样做:
- 在作业中添加一个合并器,它将在映射器之后运行,在reducer之前运行,并且在发送到reducer之前只保持唯一。
- 修改你的映射器,保持你已经发送的映射,如果你之前已经发送过这个映射,不要发送。