Java中使用AtomicBoolean的循环调度算法



当我向外部系统发送请求时,我想实现严格的循环调度。有两个外部系统服务器。第一个请求应该转到"System1",第二个请求必须转到"System2",下一个请求必须转至"System1",依此类推

由于我只有两个服务器可以向其发送请求,并且我希望在没有任何阻塞和上下文切换的情况下实现最大性能,因此我选择AtomicBoolean,因为它使用了CAS操作。

我的实现类

1.RoundRobinTest.java

package com.concurrency;
import java.util.Iterator;
public class RoundRobinTest 
{
public static void main(String[] args) 
{
for (int i = 0; i < 500; i++) 
{
new Thread(new RoundRobinLogic()).start();
}
try 
{
// Giving a few seconds for the threads to complete
Thread.currentThread().sleep(2000);
Iterator<String> output = RoundRobinLogic.output.iterator();
int i=0;
while (output.hasNext()) 
{
System.out.println(i+++":"+output.next());
// Sleeping after each out.print 
Thread.currentThread().sleep(20);
}
} 
catch (Exception ex) 
{
// do nothing
}
}
}

2.RoundRobinLogic.java(带有静态AtomicBoolean对象的类)

package com.concurrency;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class RoundRobinLogic implements Runnable 
{
private static AtomicBoolean bool = new AtomicBoolean(true);
public static Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run() 
{
if(bool.getAndSet(false))
{
// Sending the request to first system
output.add("Request to System1");
}
else if(!bool.getAndSet(true))
{
// Sending the request to first system
output.add("Request to System2");
}       
}
}

输出:


......................
314:Request to System1
315:Request to System2
316:Request to System1
317:Request to System2
318:Request to System1
319:Request to System1
320:Request to System2
321:Request to System2
322:Request to System1
323:Request to System2
324:Request to System1
325:Request to System2
......................

请求318和319已发送到同一服务器,AtomicBoolean在这种情况下失败。对于我的应用程序,1000-2000个线程可能一次访问共享对象。从Java并发实践中,我看到了以下内容。

在高争用级别上,锁定往往优于原子变量,但在更现实的情况下争用级别原子变量的性能优于锁。这是因为锁通过挂起线程来对争用作出反应,减少了共享存储器总线上的CPU使用和同步业务在低到中等争用的情况下,原子提供了更好的可扩展性;在竞争激烈的情况下,锁提供更好地避免争用。(基于CAS的算法在单CPU系统上也优于基于锁的算法,因为CAS总是在单个CPU系统上成功,除非线程在读取-修改-写入操作。)

现在我有以下问题

  1. 有没有其他有效的非阻塞方式,可以实现循环请求发送
  2. 在激烈的争用下,AtomicBoolean有可能失败吗?我所理解的是,由于激烈的争用,性能/吞吐量可能会下降。但在上面的例子中,AtomicBoolean失败了。为什么

除了John的回答之外,RoundRobinLogic的更干净、可能更高效的实现将使用AtomicIntegerAtomicLong。这消除了将AtomicBoolean的当前值与新值进行比较的需要:

class RoundRobinLogic implements Runnable
{
private static final AtomicInteger systemIndex = new AtomicInteger(1);
public static final Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
if (systemIndex.incrementAndGet() % 2 == 0) {
// Sending the request to first system
output.add("Request to System1");
} else {
// Sending the request to second system
output.add("Request to System2");
}
}
}

这将允许你很容易地将其扩展到其他系统:

class RemoteSystem
{
private final String name;
RemoteSystem(String name)
{
this.name = name;
}
public String name()
{
return name;
}
}
class RoundRobinLogic implements Runnable
{
private static final AtomicInteger systemIndex = new AtomicInteger(1);
private static final RemoteSystem[] systems = new RemoteSystem[] {
new RemoteSystem("System1"),
new RemoteSystem("System2"),
new RemoteSystem("System3"),
new RemoteSystem("System4"),
};
public static final Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
RemoteSystem system = systems[systemIndex.incrementAndGet() % systems.length];
// Sending the request to right system
output.add("Request to " + system.name());
}
}

假设您使用的不是Queue,而是实际系统的api。我看到的问题与有关

if(bool.getAndSet(false))
{
// Sending the request to first system
output.add("Request to System1");
}
else if(!bool.getAndSet(true))
{
// Sending the request to second system
output.add("Request to System2");
}     

如果两个条件都失败了怎么办?这怎么可能?想象一下,在输入第一个if时,布尔为true。然后,您尝试将其设置为false,但另一个线程击败了您,所以您看到了false。然后尝试else if。现在,如果else if在你到达那里时是假的,但设置为true,那么买另一个线程怎么办?那样的话,两次尝试都会失败。

我会将其重构为:

while(true){
boolean current = bool.get();
if(bool.compareAndSet(current, !current){
if(current){ 
//send request to first system
} else {
//send request to second system
}
return;
}
}

正如Sean Bright所提到的,因为您正在添加到队列中,即使像我上面所说的那样实现它,您仍然可能会看到一些值无序,因为队列本身不是与AtomicBoolean同步的一部分。

由于您的需求基本上是:实现的原子操作

  1. 计算并翻转布尔值(或在通用n服务器情况下计算模并递增计数器)
  2. 基于步骤1的结果将条目插入队列中

您无法通过使步骤1和2单独线程安全来真正实现这一点;您必须同步步骤1和2。

以下是一个应该有效的简单实现:

import java.util.LinkedList;
import java.util.Queue;
public class RoundRobinLogic implements Runnable 
{
private static boolean bool = true;
public static final Queue<String> OUTPUT = new LinkedList<String>();
private static final Object LOCK = new Object();
@Override
public void run() {
synchronized (LOCK) {
OUTPUT.add(bool ? "Request to System1" : "Request to System2");
bool = !bool;
}
}
}

关于您的问题:

  1. 如果需要同步两个高于处理器级别的操作,则无法避免阻塞。java.util.concurrent.atomic中的类使用机器级原子指令,这就是为什么使用这些类的代码(通常,取决于平台)不需要阻塞的原因
  2. 在您的实现中,AtomicBoolean没有失败。相反,在读取布尔值和向队列中添加元素之间存在竞争条件

最新更新