在映射器输出大部分排序时最小化随机播放



我有一个map-reduce过程,其中映射器从按键排序的文件中获取输入。例如:

1 ...
2 ...
2 ...
3 ...
3 ...
3 ...
4 ...

然后它被转换,99.9% 的键彼此保持相同的顺序,其余 99% 的键是接近的。因此,以下内容可能是对上述数据运行映射任务的输出:

a ...
c ...
c ...
d ...
e ...
d ...
e ...

因此,如果你能确保化简器接收一系列输入,并将该化简器放在大多数输入已经所在的同一节点上,那么洗牌只需要很少的数据传输。例如,假设我对数据进行了分区,以便 a-d 由一个化简器处理,e-g 由下一个化简器处理。然后,如果 a-d 可以在处理 1-4 映射的同一节点上运行,则只需要通过网络发送两条 e 记录。

如何构建一个利用数据这一属性的系统?我有Hadoop和Spark可用,不介意编写自定义分区程序等。然而,完整的工作负载是MapReduce的一个经典例子,我想坚持使用一个支持这种范式的框架。

Hadoop邮件档案提到了对这种优化的考虑。是否需要修改框架本身来实现它?

SPARK 的角度来看,没有直接支持这一点:最接近的是 mapPartitions with preservePartions=true。 但是,这对您的情况没有直接帮助,因为密钥可能不会更改。

  /**
   * Return a new RDD by applying a function to each partition of this RDD.
   *
   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
   */
  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
    val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
    new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
  }

如果您能够明确地知道没有键会移动到其原始分区之外,则上述内容将起作用。但边界上的值可能不会合作。

与迁移密钥相比,数据的规模是多少? 您可以考虑添加后处理步骤。首先为所有迁移密钥构造一个分区。映射器将为需要迁移的键输出一个特殊的键值。 然后对结果进行后处理,以对标准分区进行某种追加。 这是额外的麻烦,因此您需要在额外的步骤和管道复杂性中评估权衡。

最新更新