在不影响性能或吞吐量的情况下,对所有线程具有完全的原子性



我有一个主机名列表,我应该通过从中生成正确的URL来进行调用。假设我在链表中有四个主机名(主机a、主机B、主机C、主机D)-

  • 执行hostA url,如果hostA为UP,则获取数据并返回响应
  • 但如果hostA已关闭,请将hostA添加到主机名的阻止列表中,并确保没有其他线程正在调用hostA。然后尝试执行hostB url并返回响应
  • 但如果主机B也关闭了,那么也将主机B添加到主机名的阻止列表中,并重复同样的操作

此外,我在我的应用程序中运行了一个后台线程,它将有一个块主机名列表(来自我的另一个服务),我们不应该调用它,但它每10分钟运行一次,所以块主机名的列表只会在10分钟后更新,所以如果存在任何主机名的块列表,那么我不会从主线程调用该主机名,而是尝试调用另一个主机名。这意味着如果hostA被阻止,那么它将在阻止列表中存在hostA,但如果hostA被阻止,则该列表中将没有hostA

下面是我的后台线程代码,它从我的服务URL中获取数据,并在我的应用程序启动后每10分钟运行一次。然后,它将解析来自URL的数据,并将其存储在ClientData类变量中

TempScheduler
public class TempScheduler {
    // .. scheduledexecutors service code to start the background thread
    // call the service and get the data and then parse 
    // the response.
    private void callServiceURL() {
        String url = "url";
        RestTemplate restTemplate = new RestTemplate();
        String response = restTemplate.getForObject(url, String.class);
        parseResponse(response);
    }
    // parse the response and store it in a variable
    private void parseResponse(String response) {
        //...       
        
        // get the block list of hostnames
        Map<String, List<String>> coloExceptionList = gson.fromJson(response.split("blocklist=")[1], Map.class);
        List<String> blockList = new ArrayList<String>();
        for(Map.Entry<String, List<String>> entry : coloExceptionList.entrySet()) {
            for(String hosts : entry.getValue()) {
                blockList.add(hosts);
            }
        }
        
        // store the block list of hostnames which I am not supposed to make a call
        ClientData.replaceBlockedHosts(blockList);
    }
}

下面是我的ClientData课程。replaceBlockedHosts方法将仅由后台线程调用,这意味着只有一个编写器。但isHostBlocked方法将被主应用程序线程多次调用,以检查特定主机名是否被阻止。此外,blockHost方法将从catch block多次调用,以将下行主机添加到blockedHosts列表中,因此我需要确保所有读取线程都能看到一致的数据,并且不会调用该下行主机,而是调用主机名链表中的下一个主机。

客户端数据
public class ClientData {
    // .. some other variables here which in turn used to decide the  list of hostnames
    
    private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts = 
            new AtomicReference<ConcurrentHashMap<String, String>>(new ConcurrentHashMap<String, String>());
    public static boolean isHostBlocked(String hostName) {
        return blockedHosts.get().containsKey(hostName);
    }
    public static void blockHost(String hostName) {
        blockedHosts.get().put(hostName, hostName);
    }
    public static void replaceBlockedHosts(List<String> hostNames) {
        ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
        for (String hostName : hostNames) {
            newBlockedHosts.put(hostName, hostName);
        }
        blockedHosts.set(newBlockedHosts);
    }
}

下面是我的主要应用程序线程代码,其中有我应该调用的主机名列表。如果hostname为null或在阻止列表类别中,那么我不会调用该特定主机名,而是尝试列表中的下一个主机名。

@Override
public DataResponse call() {
    List<String> hostnames = new LinkedList<String>();
    
    // .. some separate code here to populate the hostnames list
    // from ClientData class
    
    for (String hostname : hostnames) {     
        // If host name is null or host name is in block list category, skip sending request to this host
        if (hostname == null || ClientData.isHostBlocked(hostname)) {
            continue;
        }
    
        try {
            String url = generateURL(hostname);
            response = restTemplate.getForObject(url, String.class);
            break;
        } catch (RestClientException ex) {
            // add host to block list, 
            // Is this call fully atomic and thread safe for blockHost method 
            // in ClientData class?
            ClientData.blockHost(hostname);
        }
    }
}

每当主机名从主线程中断时,我不需要调用它。我的后台线程也从我的一个服务中获得这些详细信息,每当任何服务器关闭时,它都会有一个主机名列表,这些主机名是块主机,每当它们打开时,该列表就会更新。

而且,每当抛出任何RestClientException时,我都会在blockedHosts concurrentmap中添加该主机名,因为我的后台线程每10分钟运行一次,所以在10分钟完成之前,map不会有这个主机名。每当这个服务器恢复运行时,我的后台都会自动更新这个列表。

我上面的主机名块列表代码是完全原子和线程安全的吗?因为我想要的是-如果hostA关闭,那么在被阻止的主机列表更新之前,任何其他线程都不应该调用hostA。

请记住,与其他主机的通信比您在线程中所做的任何事情都要花费更多的时间。在这种情况下,我不会担心原子操作。

假设我们有线程t1t2。CCD_ 17向CCD_ 18发送请求并等待响应。当达到超时时,将抛出一个RestClientException。现在,从抛出异常到将该主机添加到被阻止主机列表之间的时间跨度非常小。在主机被阻止之前,t2可能会在此时尝试向hostA发送请求,但更有可能的是,t2t1等待响应的长时间内已经发送了请求,这是无法阻止的。

您可以尝试设置合理的超时。当然,还有其他类型的错误不会等待超时,但即使是这样,也比处理异常需要更多的时间。

使用ConcurrentHashMap是线程安全的,应该足以跟踪被阻止的主机。

除非使用compareAndSet之类的方法,否则AtomicReference本身并不能起到多大作用,因此调用不是原子调用(但如上所述,在我看来不需要)。如果您真的想在出现异常后立即阻止主机,则应该使用某种同步。您可以使用同步集来存储被阻止的主机。这仍然无法解决在实际检测到任何连接错误之前需要一些时间的问题。


关于更新:正如评论中所说,Future超时应该大于请求超时。否则,Callable可能会被取消,主机也不会被添加到列表中。使用Future.get时可能甚至不需要超时,因为请求最终会成功或失败。

当主机A宕机时,您看到许多异常的实际问题可能只是许多线程仍在等待主机A的响应。您只在启动请求之前检查被阻止的主机,而不是在任何请求期间。任何仍在等待该主机响应的线程都将继续这样做,直到达到超时为止。

如果你想防止这种情况发生,你可以尝试定期检查当前主机是否还没有被阻止。这是一个非常天真的解决方案,有点违背了期货的目的,因为它基本上是民意调查。不过,这应该有助于理解一般问题。

// bad pseudo code 
DataTask dataTask = new DataTask(dataKeys, restTemplate);
future = service.submit(dataTask);
while(!future.isDone()) {
    if( blockedHosts.contains(currentHost) ) {
        // host unreachable, don't wait for http timeout
        future.cancel(); 
    }
    thread.sleep(/* */);
}

一个更好的方法是在中断发生时向所有等待相同主机的DataTask线程发送中断,这样他们就可以中止请求并尝试下一个主机。

ConcurrentHashMap放入AtomicReference时,操作的原子性不会改变。putget无论如何都是原子的,唯一受影响的操作replaceBlockedHosts也可以使用简单的volatile引用。但我不知道你为什么需要这个。


您的call()方法中有一个检查然后行动模式:

首先,您拨打ClientData.isHostBlocked(hostname)然后您拨打restTemplate.getForObject(generateURL(hostname), …)

因此,当另一个线程调用blockHost时,blockHostisHostBlocked的原子性确实防止了一个线程正好在isHostBlocked调用之后。因此,在后者将主机添加到阻止列表之后,前者仍将继续网络操作。


如果要限制在同一主机上可能失败的线程数,则必须限制访问同一主机的线程数。没有办法绕过它。

最新更新