嗨,我是Hadoop和mapreduce的新手。我想知道这样的事情是否可能发生。我试图通过Mapreduce比较两个文件。第一个文件可能看起来像这样:
t1 r1
t2 r2
t1 r4
第二个文件看起来像这样:
u1 t1 r1
u2 t2 r3
u3 t2 r2
u4 t1 r1
我想让它根据文件发射u1
, u3
和u4
。第二个文件将比第一个文件大得多。我不太确定如何比较这些文件;这在一个MapReduce作业中可行吗?如果有必要,我愿意链接MapReduce作业。
您可以通过将第一个文件放在分布式缓存中,并在映射阶段遍历第二个文件来执行连接来执行mapside连接。
如何从分布式缓存中读取:
@Override
protected void setup(Context context) throws IOException,InterruptedException
{
Path[] filelist=DistributedCache.getLocalCacheFiles(context.getConfiguration());
for(Path findlist:filelist)
{
if(findlist.getName().toString().trim().equals("mapmainfile.dat"))
{
fetchvalue(findlist,context);
}
}
}
public void fetchvalue(Path realfile,Context context) throws NumberFormatException, IOException
{
BufferedReader buff=new BufferedReader(new FileReader(realfile.toString()));
//some operations with the file
}
如何添加文件到分布式缓存:
DistributedCache.addCacheFile(new URI("/user/hduser`/test/mapmainfile.dat"),conf);`
可以使用映射器侧连接进行比较。使用分布式缓存将较小的文件传递给所有映射器,并通过映射器逐条读取较大的文件。
现在您可以很容易地比较收到的大文件记录和小文件记录(来自分布式缓存),并发出匹配的记录。
注意:只有当第一个文件足够小到适合映射器的内存时,这才会起作用。通常是目录文件或查找文件
如果两个文件都很大,可以使用reduce side join:
- 使用MultipleInput格式为这两个文件分别创建两个映射器。一个输入文件到一个映射器,另一个文件到另一个映射器。
- 发送第一个映射器输出数据与一个键作为组合键(TextPair)。第一部分是"t1 r1","t1, r2"等,第二部分是第一个映射器的"0",第二个映射器的"1"。对于值,从第一个映射器发出nullWritable,从第二个映射器发出u1, u2等。因此,第一个映射器的输出将如(("t1 r1","0"),null),而第二个映射器的输出将如(("t1 r1","1"),u1),(("t1 r1","1"),u4)等,使用第一个映射器的"0",这样第一个映射器的输出将首先被接收。
- 基于TextPair键的第一部分实现分区器和组比较器
- 在减速器中,您将获得按第一部分分组的数据,并像这样接收它-[("t1 r1",0"),null),(("t1 r1",1"),u1),(("t1 r1",1"),u4)]
- 丢弃所有没有"0"键的输入(因此它将删除不匹配的条目),并发出其余的值u1, u4等。