如何在多线程应用程序中连续失败后将主机名添加到阻止列表



我在我的代码中使用了 Callable,它将被多个线程调用,如下所示。截至目前,每当抛出任何RestClientException时,我都会hostname添加到阻止列表中。

public class Task implements Callable<DataResponse> {
    private DataKey key;
    private RestTemplate restTemplate;
    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }
    @Override
    public DataResponse call() {
        ResponseEntity<String> response = null;
        // construct what are the hostnames I can call basis on user id
        List<String> hostnames = some_code_here;
        for (String hostname : hostnames) {
            // If host name is null or host name is in block list, skip sending request to this host
            if (DataUtils.isEmpty(hostname) || DataMapping.isBlocked(hostname)) {
                continue;
            }
            try {
                String url = createURL(hostname);
                response = restTemplate.exchange(url, HttpMethod.GET, key.getEntity(), String.class);
                // some code here to return the response if successful
            } catch (HttpClientErrorException ex) {
                // log exception
                return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
            } catch (HttpServerErrorException ex) {
                // log exception
                return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
            } catch (RestClientException ex) {
                // I don't want to add it to block list instantly.
                // If same hostname as failed five times consecutively, then only add it
                DataMapping.blockHost(hostname);
            }
        }
        return new DataResponse(DataErrorEnum.SERVER_UNAVAILABLE, DataStatusEnum.ERROR);        
    }
}

以下是我在课堂DataMapping内容:

private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts = 
        new AtomicReference<ConcurrentHashMap<String, String>>(new ConcurrentHashMap<String, String>());
public static boolean isBlocked(String hostName) {
    return blockedHosts.get().containsKey(hostName);
}
public static void blockHost(String hostName) {
    blockedHosts.get().put(hostName, hostName);
}

问题陈述:-

现在,正如您在call方法中看到的那样,一旦hostname抛出可能不正确的RestClientException,我就会阻止它。我需要查看特定hostname是否连续五次抛出RestClientException,然后仅通过调用此行将此hostname添加到阻止列表中DataMapping.blockHost(hostname);否则不要将其添加到阻止列表中。

什么是

最有效和最好的方法?最多,我总共将拥有 70-100 台独特的机器。

在这种情况下,我的调用方法将从多个线程调用,因此我需要确保为每个hostname正确保留计数,以防它们抛出RestClientException

编辑:

我在DataMapping类中也有以下方法:

我有一个每 2 分钟运行一次的后台线程,替换整个集合,因为我的服务提供真实数据,无论任何主机名是否真的被阻止。我想我确实需要更换整套设备时的atomic reference

我也在代码中本地添加块功能,因为我可能会在 2 分钟后知道哪台机器被阻止,所以如果可能的话,最好事先知道它。

// this is being updated from my background thread which runs every 2 minutes
public static void replaceBlockedHosts(List<String> hostNames) {
    ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
    for (String hostName : hostNames) {
        newBlockedHosts.put(hostName, hostName);
    }
    blockedHosts.set(newBlockedHosts);
}
我会将

每个主机与每个RestClientException递增的AtomicInteger相关联。此整数将在成功调用时设置为零,以强制实施">连续五次"约束。代码看起来像这样。

private final ConcurrentHashMap<String, AtomicInteger> failedCallCount = new ConcurrentHashMap<>();
void call() {
      try {
          String url = createURL(host);
          // make rest call
          resetFailedCallCount(host);
          // ...
      } catch (RestClientException ex) {
          registerFailedCall(host);
          if (shouldBeBlocked(host)) {
              DataMapping.blockHost(host);
          }
      }
}

private boolean shouldBeBlocked(String hostName) {
    AtomicInteger count = failedCallCount.getOrDefault(hostName, new AtomicInteger());
    return count.get() >= 5;
}
private void registerFailedCall(String hostName) {
    AtomicInteger newValue = new AtomicInteger();
    AtomicInteger val = failedCallCount.putIfAbsent(hostName, newValue);
    if (val == null) {
        val = newValue;
    }
    if (val.get() < 5) {
        val.incrementAndGet();
    }
}
private void resetFailedCallCount(String hostName) {
    AtomicInteger count = failedCallCount.get(hostName);
    if (count != null) {
        count.set(0);
    }
}

这是无锁的(至少在我们自己的代码中(,并且非常有效。但是,它容易受到某些竞争条件的影响。最值得注意的是,计数可以大于 5。但是,这应该不是问题,因为主机无论如何都被阻止并且计数不用于其他任何内容。

维护一个静态寄存器,如 - public static ConcurrentHashMap<String, Integer> toBeBlockedHostName = new ConcurrentHashMap<String, Integer>(); 在你的DataMapping类中。然后像这样使用它,你的 FOR 循环:

  for (String hostname : hostnames) {
        // .. some code here
        //After ensuring everything is success and no RestClientException, i.e. can be last line of your TRY block...
        DataMapping.toBeBlockedHostName.remove("stackoverflow6361");
        catch (RestClientException ex) {
            if(DataMapping.toBeBlockedHostName.get("stackoverflow6361") == null){
                DataMapping.toBeBlockedHostName.put("stackoverflow6361", new Integer(1));
            } else{
                if(DataMapping.toBeBlockedHostName.get("stackoverflow6361") == 5){ //Don't hard code 5, have it from some property file after defining as retryThreshold...
                    System.out.println("Blocking threshold reached, block the hostname...");
                    DataMapping.blockHost(hostname);
                } else{
                    DataMapping.toBeBlockedHostName.put("stackoverflow6361", (toBeBlockedHostName.get("stackoverflow6361") + 1));
                }
            }
        }

请注意:: 对于ConcurrentHashMap即使所有操作都是线程安全的,检索操作也不涉及锁定。

请注意,连续 5 次重试失败后,您将阻止主机名,但如果您再次取消阻止它,则应清除寄存器。

PS:为HashMap提供合适的getter和setter。

相关内容

  • 没有找到相关文章

最新更新