获取远程进程组详细信息时出现NiFi Rest API问题



使用下面提到的NiFi Rest API端点和代码片段,

我正在获取远程进程组(RPG(的列表,迭代并获取每个RPG的详细信息。问题是,我得到了不准确的RPG数据。如果我达到了此端点(https://nifihost:8080/nifi-api/remote-process-groups/{id}(,我将收到准确的详细信息。请澄清,

  1. 为什么这两个结果之间存在差异终点?(https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groupsVs。https://nifihost:8080/nifi-api/remote-process-groups/{id}(
  2. 由于我的要求是通过每个Process Group进行迭代其中的远程进程组(RPG(,并获取每个RPG的详细信息?什么是正确的方法实现这一目标

终点:

https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups

源代码

ArrayList<NifiRemoteProcessGroup> remoteProcessGroupArrayList = new ArrayList<>();
String returnedJSON = "";
String remoteProcessGroupURL =  getNifiURL() + "/nifi-api/process-groups/" + processGroup + "/remote-process-groups";
HttpEntity httpEntity = RestCall.oAuthHeaders(token);
RestTemplate restTemplate = new RestTemplate();
try{
ResponseEntity<String> response = restTemplate.exchange(remoteProcessGroupURL,HttpMethod.GET,httpEntity,String.class);
returnedJSON = response.getBody();
}
catch(Exception e){
logger.error("There was an error retrieving the remote-process-groups : " + e.getMessage());
}
try{
ObjectMapper objectMapper = new ObjectMapper();
JsonNode rootNode = objectMapper.readTree(returnedJSON);
JsonNode processorNode = rootNode.path("remoteProcessGroups");
Iterator<JsonNode> elements = processorNode.elements();
while(elements.hasNext()){
JsonNode remoteProcessGroup = elements.next();
JsonNode statusElement = remoteProcessGroup.path("status");
JsonNode bulletinElement = remoteProcessGroup.path("bulletins");
JsonNode componentElement = remoteProcessGroup.path("component");
JsonNode aggregateSnapshot = statusElement.path("aggregateSnapshot");
NifiRemoteProcessGroup remoteProcessGroupInstance = new NifiRemoteProcessGroup();
remoteProcessGroupInstance.setRemoteProcessGroupId(checkExists(statusElement,"id"));
remoteProcessGroupInstance.setRemoteProcessGroupName(checkExists(componentElement,"name"));
remoteProcessGroupInstance.setRemoteProcessGroupGroupId(checkExists(statusElement,"groupId"));
remoteProcessGroupInstance.setRemoteProcessGroupTargetURL(checkExists(componentElement,"targetUri"));
remoteProcessGroupInstance.setRemoteProcessGroupBulletins(bulletinElement.asText());
remoteProcessGroupInstance.setRemoteProcessGroupTransmitting(Boolean.valueOf(checkExists(componentElement,"transmitting")));
remoteProcessGroupInstance.setRemoteProcessGroupTransmissionStatus(checkExists(statusElement,"transmissionStatus"));
remoteProcessGroupInstance.setRemoteProcessGroupActiveThreadCount(Double.valueOf(checkExists(aggregateSnapshot,"activeThreadCount")));
remoteProcessGroupInstance.setRemoteProcessGroupFlowFilesReceived(Double.valueOf(checkExists(aggregateSnapshot,"flowFilesReceived")));
remoteProcessGroupInstance.setRemoteProcessGroupBytesReceived(Double.valueOf(checkExists(aggregateSnapshot,"bytesReceived")));
remoteProcessGroupInstance.setRemoteProcessGroupReceived(checkExists(aggregateSnapshot,"received"));
remoteProcessGroupArrayList.add(remoteProcessGroupInstance);
}
}
catch(Exception e){
logger.info("There was an error creating the list of remote process groups: " + e.getMessage());
}
  1. "进程组/{id}/远程进程组"是ProcessGroupsAPI子部分的一部分,它将返回RemoteProcessGroupsEntity,其中包含与您提交的id的ProcessGroup绑定的远程进程组的列表
  2. "remote-process-groups/{id}"是RemoteProcessGroups API的一部分,将获取所请求的确切RemoteProcessGroupEntity(请注意缺少复数(

我为NiFi维护名义上的Python客户端,鉴于您提到的寻求结果,我建议您可以尝试:

import nipyapi
nipyapi.utils.set_endpoint('http://localhost:8080/nifi')
rpg_info = [nipyapi.canvas.get_remote_process_group(rpg.id) for rpg in nipyapi.canvas.list_all_remote_process_groups('root', True)]

返回的RPG信息将在.component.parent_group_ID下为您提供父ProcessGroup ID,允许您重建树,但您应该会发现它比单独查找每个树更具性能。

最新更新