>我有一个映射器,可以读取输入并写入数据库。我想限制实际转换和写入该数据库的输入数量,并且所有映射器都必须为限制做出贡献,然后在达到该限制后停止(大约;一两个额外的没什么大不了的。
我在映射器上实现了一个限制器函数,该函数询问其他任务,"您导入了多少条记录?一旦达到给定的限制,它将停止导入这些记录(尽管它将继续出于其他目的处理它们)。
有问题的地图代码如下所示:
public void map(ImmutableBytesWritable key, Result row, Context context) {
// prepare the input
// ...
if (context.getCounter(Metrics.IMPORTED).getValue()<IMPORT_LIMIT){
importRecord();
context.getCounter(Metrics.IMPORTED).increment(1l);
}
// do other things
// ...
}
因此,每个映射器都会检查是否有更多空间可以导入,并且只有在未达到限制时才会执行任何导入。但是,每个映射器本身都会导入到限制,因此对于 16 个映射器,我们导入了 16*IMPORT_LIMIT 条记录。它肯定在做一些限制(计数远低于导入记录的正常数量。
计数器值何时推送到其他映射器,或者它们是否可供每个映射器使用?我实际上可以从计数器中获取一些实时值,还是仅在映射器完成后更新?有没有更好的方法在映射器之间共享值?
好的:据我所知,在工作完成之前,MapReduce不会在映射器之间共享计数器(即根本不共享)。我不确定中途提交的映射器是否会允许以后的映射器看到他们的计数器,但它不够可靠,无法实时完成。
相反,我要做的是运行一个简单的Java应用程序,该应用程序自行迭代行并写入列,现有的MapReduce作业将使用该列来确定它是否应该导入该行。