我正在运行一个本地Yarn集群,它有8个vCore和8Gb的总内存。
工作流程如下:
-
YarnClient提交一个应用程序请求,该请求在容器中启动AppMaster。
-
AppMaster启动,创建amRMClient和nmClient,将自己注册到RM,然后通过amRMClient.addContainerRequest 为工作线程创建4个容器请求
即使有足够的可用资源,容器也不会被分配(回调函数onContainersAllocated从未被调用)。我尝试检查nodemanager和resourcemanager的日志,但没有看到任何与容器请求相关的行。我密切关注apache文档,不明白自己做错了什么。
这里是AppMaster代码供参考:
@Override
public void run() {
Map<String, String> envs = System.getenv();
String containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.toString());
if (containerIdString == null) {
// container id should always be set in the env by the framework
throw new IllegalArgumentException("ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
LOG.info("Starting AppMaster Client...");
YarnAMRMCallbackHandler amHandler = new YarnAMRMCallbackHandler(allocatedYarnContainers);
// TODO: get heart-beet interval from config instead of 100 default value
amClient = AMRMClientAsync.createAMRMClientAsync(1000, this);
amClient.init(config);
amClient.start();
LOG.info("Starting AppMaster Client OK");
//YarnNMCallbackHandler nmHandler = new YarnNMCallbackHandler();
containerManager = NMClient.createNMClient();
containerManager.init(config);
containerManager.start();
// Get port, ulr information. TODO: get tracking url
String appMasterHostname = NetUtils.getHostname();
String appMasterTrackingUrl = "/progress";
// Register self with ResourceManager. This will start heart-beating to the RM
RegisterApplicationMasterResponse response = null;
LOG.info("Register AppMaster on: " + appMasterHostname + "...");
try {
response = amClient.registerApplicationMaster(appMasterHostname, 0, appMasterTrackingUrl);
} catch (YarnException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
LOG.info("Register AppMaster OK");
// Dump out information about cluster capability as seen by the resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
containerMemory = Integer.parseInt(config.get(YarnConfig.YARN_CONTAINER_MEMORY_MB));
containerCores = Integer.parseInt(config.get(YarnConfig.YARN_CONTAINER_CPU_CORES));
// A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerCores + ", max=" + maxVCores);
containerCores = maxVCores;
}
List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration.");
for (int i = 0; i < 4; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amClient.addContainerRequest(containerAsk); // NOTHING HAPPENS HERE...
LOG.info("Available resources: " + amClient.getAvailableResources().toString());
}
while(completedYarnContainers != 4) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOG.info("Done with allocation!");
}
@Override
public void onContainersAllocated(List<Container> containers) {
LOG.info("Got response from RM for container ask, allocatedCnt=" + containers.size());
for (Container container : containers) {
LOG.info("Allocated yarn container with id: {}" + container.getId());
allocatedYarnContainers.push(container);
// TODO: Launch the container in a thread
}
}
@Override
public void onError(Throwable error) {
LOG.error(error.getMessage());
}
@Override
public float getProgress() {
return (float) completedYarnContainers / allocatedYarnContainers.size();
}
以下是jps:的输出
14594 NameNode
15269 DataNode
17975 Jps
14666 ResourceManager
14702 NodeManager
这里是用于初始化和4个容器请求的AppMaster日志:
23:47:09 YarnAppMaster - Starting AppMaster Client OK
23:47:09 YarnAppMaster - Register AppMaster on: andrei-mbp.local/192.168.1.4...
23:47:09 YarnAppMaster - Register AppMaster OK
23:47:09 YarnAppMaster - Max mem capabililty of resources in this cluster 2048
23:47:09 YarnAppMaster - Max vcores capabililty of resources in this cluster 2
23:47:09 YarnAppMaster - Received 0 previous AM's running containers on AM registration.
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Progress indicator should not be negative
提前谢谢。
我怀疑问题完全来自负面进展:
23:47:11 YarnAppMaster-进度指示器不应为负
请注意,由于您使用的是AMRMAsyncClient,因此在调用addContainerRequest时不会立即发出请求。实际上,有一个周期性运行的心跳函数,正是在这个函数中调用allocate并发出挂起的请求。此函数使用的进度值最初从0开始,但在获取响应后会使用处理程序返回的值进行更新。
第一次获取应该在寄存器之后立即完成,因此应该调用getProgress函数并更新现有进度。事实上,您的进度将更新为NaN,因为此时allocatedYarContainers将为空,completedYarContainer也将为0,因此您返回的进度将是未定义的0/0的结果。碰巧的是,当下一次分配检查您的进度值时,它将失败,因为NaN在所有比较中都返回false,因此没有其他分配函数真正与ResourceManager通信,因为它在第一步就退出了,但出现了异常。
尝试将您的进度功能更改为以下内容:
@Override
public float getProgress() {
return (float) allocatedYarnContainers.size() / 4.0f;
}
(注意:从这里复制到StackOverflow的后验性)
感谢Alexandre Fonseca指出,当在第一次分配之前调用getProgress()时,它会返回一个除以零的NaN,这会使ResourceManager立即退出并出现异常。
点击此处了解更多信息。