这是java中更高效的线程安全队列



有一个简单的任务:许多线程调用MyClass.add()函数,一个线程尝试为它们提供服务
我的问题是:哪种解决方案更好或更有效?

第一种方法:使用CopyOnWriteArrayList

@Singleton
public class myClass {
    List<myType> list = new CopyOnWriteArrayList<myType>();
    boolean isRunning = false;
    //this is called from many threads
    public void add(myType x){
        list.add(x);
    }

    //this is called from 1 thread
    public void start(){
        if (isRunning) return;
        isRunning = true;
        while (!list.isEmpty()) {
            myType curr = list.remove(0);
            //do something with curr...
        }
        isRunning = false;
    }
}



第二种简单锁定方法:

@Singleton
public class myClass {
    List<myType> list = new ArrayList<myType>();
    boolean isRunning = false;
    private final Lock _mutex = new ReentrantLock(true);
    //this is called from many threads
    public void add(myType x){
        _mutex.lock();
        list.add(x);
        _mutex.unlock();
    }

    //this is called from 1 thread
    public void start(){
        if (isRunning) return;
        isRunning = true;
        while (!list.isEmpty()) {
            _mutex.lock();
            myType curr = list.remove(0);
            _mutex.unlock();
            //do something with curr...
        }
        isRunning = false;
    }
}



第三种方法:使用ConcurrentLinkedQueue

@Singleton
public class myClass {
    ConcurrentLinkedQueue<myType> list = new ConcurrentLinkedQueue<myType>();
    boolean isRunning = false;
    //this is called from many threads
    public void add(myType x){
        list.add(x);
    }

    //this is called from 1 thread
    public void start(){
        if (isRunning) return;
        isRunning = true;
        while (!list.isEmpty()) {
            //list cannot be empty at this point: other threads can't remove any items 
            myType curr = list.poll();
            //do something with curr...
        }
        isRunning = false;
    }
}



这是最初错误的解决方案。我不知道为什么它有时会给出(>100个线程)ConcurrentModificationException(尽管有迭代器和"同步"):

@Singleton
public class myClass {
    List<myType> list = Collections.synchronizedList(new ArrayList<myType>());
    boolean isRunning = false;
    //this is called from many threads
    public void add(myType x){
        synchronized(list) {
            list.add(x);
        }
    }

    //this is called from 1 thread
    public void start(){
        if (isRunning) return;
        isRunning = true;
        for (ListIterator<myType> iter = list.listIterator(); iter.hasNext();){
            myType curr = iter.next();
            //do something with curr...
            synchronized(list) {
                iter.remove(); //sometime it gives ConcurrentModificationException!
            }
        }
        isRunning = false;
    }
}

一般规则是:最适合您的问题的规则。

锁变体大大降低了速度,因为如果所有线程进入锁部分,即使不需要它,它们也会被挂起(如果有5个元素,5个线程可以同时轮询它们,只有第6个线程需要等待)。然而,如果您有一个永远无法共享的单一资源,比如网络连接或文件,那么这个解决方案是好的。

如果您的线程很少写入但经常读取,那么CopyOnWriteArrayList是最好的解决方案。写操作的成本要高得多,而读操作的速度要快得多(与ConcurrentLinkedQueue相比)。但是你的代码主要是写的,所以这对你来说不是一个好的解决方案。

如果读写量大致相等,ConcurrentLinkedQueue是最佳解决方案,因此得名Queue。所以它应该最适合你的情况。

此外,您的代码中有一个严重错误:

while (!list.isEmpty()) {
  myType curr = list.poll();

该列表只是保证每个调用都是原子式完成的,但您的代码并不是因为使用它就自动线程安全的。在本例中,该列表可能已经在isEmpty()poll()之间进行了修改,因此它可能在isEmpty()调用上有1个元素,但在轮询后就没有了。ConcurrentLinkedQueue通过返回null而不是通过您的代码来优雅地处理此问题。因此,正确的形式是:

myType curr;
while ((curr = list.poll()) != null) {

由于轮询是一个原子调用,因此是线程安全调用,所以它要么返回元素,要么不返回。由于线程化,之前发生的事情和之后发生的事情都是未定义的,但您可以确信,这个单独的调用(在后台做得更多)将始终完美工作。

remove(0)调用也是如此,如果最后一个元素已被其间的另一个线程删除,它可以抛出IndexOutOfBoundsException

最新更新