多线程合并排序算法



我有一个类,只要元素实现 Comparable,它就可以对泛型列表进行一些递归合并排序。我正在尝试使代码多线程以提高性能,为此,我有一个静态变量maxThreads,它可以防止我创建的线程数不爆炸,并且我有一个静态变量currentThreads跟踪我当前正在运行的线程数。我的currentThreads变量似乎存在竞争条件,但我无法提出修复它的解决方案。

import java.util.ArrayList;
import java.util.List;
public class ThreadedMergeSorter<E extends Comparable<? super E>> implements, Runnable  
{
  private List<E> list;
  private List<E> left, right;
  private Thread t1, t2;
  private static final int maxThreads = 4;
  private static AtomicInteger currentThreads = new AtomicInteger(0);
  private ThreadedMergeSorter(List<E> list)
  {
    this.list = list;
  }
  public ThreadedMergeSorter(){}

  /**
   * Sorts a List<E> using the merge sorting algorithm
   * @param list the list to be merge sorted
   * @return 
   * @throws InterruptedException 
   */
  public void sort(List<E> list) 
  {
    if(list.size() > 1)
    {                  
      left = new ArrayList<E>(list.subList(0, list.size()/2));
      right = new ArrayList<E>(list.subList(list.size()/2, list.size()));
      list.clear();
      if(currentThreads.get() < maxThreads)
      {
        t1 = new Thread(new ThreadedMergeSorter<E>(left));
        t1.start();
        currentThreads.incrementAndGet();
      }
      else sort(left);
      if(currentThreads.get() < maxThreads)
      {
        t2 = new Thread(new ThreadedMergeSorter<E>(right));
        t2.start();
        currentThreads.incrementAndGet();
      }
      else sort(right);
      try{
        if(t1 != null)
        {
          t1.join();
          currentThreads.decrementAndGet();
        }
        if(t2 != null)
        {
          t2.join();
          currentThreads.decrementAndGet();
        }
      }catch(InterruptedException e){}
      list.addAll(mergeSortedLists(left, right)); 
    } 
  }
  /**
   * Merges two previously sorted List<E extends Comparable<E>> into a single List
   * @param leftArray a List of previously sorted elements
   * @param rightArray a List of previously sorted elements
   * @return an new sorted List
   */
  private List<E> mergeSortedLists(List<E> leftList, List<E> rightList)
  {
    ArrayList<E> list = new ArrayList<E>();
    while(!leftList.isEmpty() && !rightList.isEmpty())
    {
      if((leftList.get(0)).compareTo(rightList.get(0)) <= 0)
        list.add(leftList.remove(0));        
      else
        list.add(rightList.remove(0));
    }
    if(!leftList.isEmpty())
      list.addAll(leftList);
    if(!rightList.isEmpty())
      list.addAll(rightList);
    return list;
  }

  @Override
  public void run() 
  {
    sort(this.list);
  }
}

问题出在 if 语句和try catch块的sort(List<E> list)方法中。

首先,您没有并行运行任何内容。线程以 start() 开头,而不是 run() ,它只是在当前线程上调用 run 方法。

其次,如果您有共享变量正在更新,请尝试将它们声明为 AtomicInteger

private static AtomicInteger currentThreads = new AtomicInteger(0);

然后使用以下方法原子递增/递减:

currentThreads.incrementAndGet();
currentThreads.decrementAndGet();

不要不断地创建、终止和销毁线程。不要试图对线程进行微观管理——正如您所发现的那样,这非常困难且容易出错。

如果你想串接一个合并排序,(这不是一个坏主意:),看看ThreadPoolExecutor和CountDownLatch。

如果您使用的是 Java 7,我建议您使用新的 Fork/Join,并使用 AtomicReferenceArray<E> 而不是 List,以便您可以以线程安全的方式就地进行排序。

另一种解决方案(假设您运行的是 Java 5 及更高版本)可以将 currentThreads 声明为易失性类成员:

private static volatile int currentThreads = 0;

您可以在此处阅读有关volatile关键字的更多信息。

最新更新