我们最近从MapReduce迁移到TEZ,以便在EMR上执行Hive查询。我们看到的情况是,对于确切的 Hive 查询,启动的映射器数量非常不同。请参阅下面的地图 3 阶段。在第一次运行时,它请求了 305 个资源,在另一次运行时,它请求了 4534 个映射器。(请忽略 KILL 状态,因为我手动终止了查询。为什么会这样?我们如何将其更改为基于底层数据大小?
运行 1
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 container KILLED 5 0 0 5 0 0
Map 3 container KILLED 305 0 0 305 0 0
Map 5 container KILLED 16 0 0 16 0 0
Map 6 container KILLED 1 0 0 1 0 0
Reducer 2 container KILLED 333 0 0 333 0 0
Reducer 4 container KILLED 796 0 0 796 0 0
----------------------------------------------------------------------------------------------
VERTICES: 00/06 [>>--------------------------] 0% ELAPSED TIME: 14.16 s
----------------------------------------------------------------------------------------------
运行 2
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 5 5 0 0 0 0
Map 3 container KILLED 4534 0 0 4534 0 0
Map 5 .......... container SUCCEEDED 325 325 0 0 0 0
Map 6 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 container KILLED 333 0 0 333 0 0
Reducer 4 container KILLED 796 0 0 796 0 0
----------------------------------------------------------------------------------------------
VERTICES: 03/06 [=>>-------------------------] 5% ELAPSED TIME: 527.16 s
----------------------------------------------------------------------------------------------
本文介绍了 Tez 分配资源的过程。 https://cwiki.apache.org/confluence/display/TEZ/How+initial+task+parallelism+works
如果为拆分启用了 Tez 分组,则通用分组 逻辑在这些拆分上运行,以将它们分组为更大的拆分。这 这个想法是在处理的并行程度和 每个并行进程中完成了多少工作。
- 首先,Tez 尝试找出群集中用于这些任务的资源可用性。为此,YARN 提供了一个裕量值(和 将来可能会使用其他属性)。假设此值为 T。
- 接下来,Tez 将 T 除以每个任务的资源(例如 M),以找出一个任务(即在单个波中)可以并行运行多少个任务。I c T/M.
- 接下来的W乘以波因数(来自配置 - tez.grouping.split-waves)以确定要使用的任务数。 假设此值为 N。
- 如果总共有 X 个拆分(输入分片)和 N 个任务,那么这将对每个任务的 X/N 拆分进行分组。然后,Tez估计了 基于每个任务的拆分次数的每个任务的数据。
- 如果此值介于 tez.grouping.max-size 和 tez.grouping.min-size 之间,则接受 N 作为任务数。如果 不是,则调整 N 以使每个任务的数据与 最大值/最小值取决于超过的阈值。
- 出于实验目的,可以在配置中设置 tez.grouping.split-count 以指定所需的组数。如果此配置 被指定,则忽略上述逻辑,Tez 尝试分组 拆分为指定数量的组。这是最大的努力。
- 在此之后,执行分组算法。它按节点位置进行分组,然后按机架位置进行分组,同时尊重组大小 限制。