Do BlockingQueue方法总是在线程中断时抛出InterruptedException



在我的一个Java 6应用程序中,我有一个线程向主线程提供数据,同时还从数据库中预取更多记录。它使用ArrayBlockingQueue队列作为FIFO缓冲区,其主循环如下:

while (!Thread.interrupted()) {
    if (source.hasNext()) {
        try {
            queue.put(source.next())
        } catch (InterruptedException e) {
            break;
        }
    } else {
        break;
    }
}

有一些代码在循环终止后会进行一些清理,比如毒害队列和释放任何资源,但这几乎就是全部

目前,从主线程到馈送线程没有直接通信:馈送线程使用正确的选项进行设置,然后使用阻塞队列控制数据流。

当队列已满时,主线程需要关闭馈送器时,就会出现问题。由于没有直接控制通道,关闭方法使用线程接口interrupt()馈送线程。不幸的是,在大多数情况下,馈送线程在put()中仍然被阻塞,尽管被中断了——不会引发异常。

通过对interrupt()文档和队列实现源代码的简要阅读,在我看来,put()块通常不使用JVM的任何可中断功能。更具体地说,在我当前的JVM(OpenJDK 1.6b22)上,它阻塞了sun.misc.Unsafe.park()本机方法。也许它使用了spinlock或其他什么,但在任何情况下,这似乎属于以下情况:

如果前面的任何条件都不成立,那么将设置该线程的中断状态。

设置了一个状态标志,但线程在put()中仍然被阻塞,并且不会进一步迭代,因此可以检查该标志。结果如何?一个僵尸线程,就是不会死

  1. 我对这个问题的理解是正确的,还是我遗漏了什么?

  2. 解决这个问题的可能方法是什么?现在我只能想到两个解决方案:

    a。在队列上多次调用poll()来解锁馈送线程:从我所看到的情况来看,这很丑陋,不太可靠,但它基本上是有效的

    b。使用带超时的offer()方法而不是put(),以允许线程在可接受的时间范围内检查其中断状态。

除非我遗漏了什么,否则这是对Java中BlockingQueue实现的一个文档不足的警告。当文档建议中毒队列以关闭工作线程时,似乎有一些迹象,但我找不到任何明确的参考。

编辑:

好的,上面的溶液(a)有一个更剧烈的变化:ArrayBlockingQueue.clear()。我认为这应该一直有效,即使这不是优雅的定义。。。

我认为您的问题可能有两个原因。

  1. 正如《被破坏的门铃定律》中所描述的,你可能没有正确处理中断。在那里你会发现:

    当我们调用可能导致InterruptedException的代码时,我们应该怎么做?不要立即拔出电池!这个问题通常有两个答案:

    从方法中重试InterruptedException这通常是最简单、最好的方法。它被新的java.util.concurrent.*包使用,这解释了为什么我们现在经常接触到这个异常
    捕获它,设置中断状态,返回如果您在调用可能导致异常的代码的循环中运行,则应将状态设置回中断

    例如:

    while (!Thread.currentThread().isInterrupted()) {
        // do something
        try {
            TimeUnit.SECONDS.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
    
  2. CCD_ 11或CCD_。请参阅下面添加的了解我如何解决此问题。

我相信在ArrayBlockingqueue.put()中断线程是一个有效的解决方案。

添加

我使用CloseableBlockingQueue解决了问题2,它可以从读取器端关闭。这样,一旦关闭,所有put调用都将快捷。然后,您可以从编写器中检查队列的closed标志。

// A blocking queue I can close from the pull end. 
// Please only use put because offer does not shortcut on close.
// <editor-fold defaultstate="collapsed" desc="// Exactly what it says on the tin.">
class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
  // Flag indicates closed state.
  private volatile boolean closed = false;
  // All blocked threads. Actually this is all threads that are in the process
  // of invoking a put but if put doesn't block then they disappear pretty fast.
  // NB: Container is O(1) for get and almost O(1) (depending on how busy it is) for put.
  private final Container<Thread> blocked;
  // Limited size.
  public CloseableBlockingQueue(int queueLength) {
    super(queueLength);
    blocked = new Container<Thread>(queueLength);
  }
  /**
   * *
   * Shortcut to do nothing if closed.
   *
   * Track blocked threads.
   */
  @Override
  public void put(E e) throws InterruptedException {
    if (!closed) {
      Thread t = Thread.currentThread();
      // Hold my node on the stack so removal can be trivial.
      Container.Node<Thread> n = blocked.add(t);
      try {
        super.put(e);
      } finally {
        // Not blocked anymore.
        blocked.remove(n, t);
      }
    }
  }
  /**
   *
   * Shortcut to do nothing if closed.
   */
  @Override
  public E poll() {
    E it = null;
    // Do nothing when closed.
    if (!closed) {
      it = super.poll();
    }
    return it;
  }
  /**
   *
   * Shortcut to do nothing if closed.
   */
  @Override
  public E poll(long l, TimeUnit tu) throws InterruptedException {
    E it = null;
    // Do nothing when closed.
    if (!closed) {
      it = super.poll(l, tu);
    }
    return it;
  }
  /**
   *
   * isClosed
   */
  boolean isClosed() {
    return closed;
  }
  /**
   *
   * Close down everything.
   */
  void close() {
    // Stop all new queue entries.
    closed = true;
    // Must unblock all blocked threads.
    // Walk all blocked threads and interrupt them.
    for (Thread t : blocked) {
      //log("! Interrupting " + t.toString());
      // Interrupt all of them.
      t.interrupt();
    }
  }
  @Override
  public String toString() {
    return blocked.toString();
  }
}

您还需要无锁的Container和O(1)put/get(尽管它不是严格意义上的集合)。它在幕后使用Ring

public class Container<T> implements Iterable<T> {
  // The capacity of the container.
  final int capacity;
  // The list.
  AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
  // Constructor
  public Container(int capacity) {
    this.capacity = capacity;
    // Construct the list.
    Node<T> h = new Node<T>();
    Node<T> it = h;
    // One created, now add (capacity - 1) more
    for (int i = 0; i < capacity - 1; i++) {
      // Add it.
      it.next = new Node<T>();
      // Step on to it.
      it = it.next;
    }
    // Make it a ring.
    it.next = h;
    // Install it.
    head.set(h);
  }
  // Empty ... NOT thread safe.
  public void clear() {
    Node<T> it = head.get();
    for (int i = 0; i < capacity; i++) {
      // Trash the element
      it.element = null;
      // Mark it free.
      it.free.set(true);
      it = it.next;
    }
    // Clear stats.
    resetStats();
  }
  // Add a new one.
  public Node<T> add(T element) {
    // Get a free node and attach the element.
    return getFree().attach(element);
  }
  // Find the next free element and mark it not free.
  private Node<T> getFree() {
    Node<T> freeNode = head.get();
    int skipped = 0;
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free.
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
      skipped += 1;
      freeNode = freeNode.next;
    }
    if (skipped < capacity) {
      // Put the head as next.
      // Doesn't matter if it fails. That would just mean someone else was doing the same.
      head.set(freeNode.next);
    } else {
      // We hit the end! No more free nodes.
      throw new IllegalStateException("Capacity exhausted.");
    }
    return freeNode;
  }
  // Mark it free.
  public void remove(Node<T> it, T element) {
    // Remove the element first.
    it.detach(element);
    // Mark it as free.
    if (!it.free.compareAndSet(false, true)) {
      throw new IllegalStateException("Freeing a freed node.");
    }
  }
  // The Node class. It is static so needs the <T> repeated.
  public static class Node<T> {
    // The element in the node.
    private T element;
    // Are we free?
    private AtomicBoolean free = new AtomicBoolean(true);
    // The next reference in whatever list I am in.
    private Node<T> next;
    // Construct a node of the list
    private Node() {
      // Start empty.
      element = null;
    }
    // Attach the element.
    public Node<T> attach(T element) {
      // Sanity check.
      if (this.element == null) {
        this.element = element;
      } else {
        throw new IllegalArgumentException("There is already an element attached.");
      }
      // Useful for chaining.
      return this;
    }
    // Detach the element.
    public Node<T> detach(T element) {
      // Sanity check.
      if (this.element == element) {
        this.element = null;
      } else {
        throw new IllegalArgumentException("Removal of wrong element.");
      }
      // Useful for chaining.
      return this;
    }
    @Override
    public String toString() {
      return element != null ? element.toString() : "null";
    }
  }
  // Provides an iterator across all items in the container.
  public Iterator<T> iterator() {
    return new UsedNodesIterator<T>(this);
  }
  // Iterates across used nodes.
  private static class UsedNodesIterator<T> implements Iterator<T> {
    // Where next to look for the next used node.
    Node<T> it;
    int limit = 0;
    T next = null;
    public UsedNodesIterator(Container<T> c) {
      // Snapshot the head node at this time.
      it = c.head.get();
      limit = c.capacity;
    }
    public boolean hasNext() {
      if (next == null) {
        // Scan to the next non-free node.
        while (limit > 0 && it.free.get() == true) {
          it = it.next;
          // Step down 1.
          limit -= 1;
        }
        if (limit != 0) {
          next = it.element;
        }
      }
      return next != null;
    }
    public T next() {
      T n = null;
      if ( hasNext () ) {
        // Give it to them.
        n = next;
        next = null;
        // Step forward.
        it = it.next;
        limit -= 1;
      } else {
        // Not there!!
        throw new NoSuchElementException ();
      }
      return n;
    }
    public void remove() {
      throw new UnsupportedOperationException("Not supported.");
    }
  }
  @Override
  public String toString() {
    StringBuilder s = new StringBuilder();
    Separator comma = new Separator(",");
    // Keep counts too.
    int usedCount = 0;
    int freeCount = 0;
    // I will iterate the list myself as I want to count free nodes too.
    Node<T> it = head.get();
    int count = 0;
    s.append("[");
    // Scan to the end.
    while (count < capacity) {
      // Is it in-use?
      if (it.free.get() == false) {
        // Grab its element.
        T e = it.element;
        // Is it null?
        if (e != null) {
          // Good element.
          s.append(comma.sep()).append(e.toString());
          // Count them.
          usedCount += 1;
        } else {
          // Probably became free while I was traversing.
          // Because the element is detached before the entry is marked free.
          freeCount += 1;
        }
      } else {
        // Free one.
        freeCount += 1;
      }
      // Next
      it = it.next;
      count += 1;
    }
    // Decorate with counts "]used+free".
    s.append("]").append(usedCount).append("+").append(freeCount);
    if (usedCount + freeCount != capacity) {
      // Perhaps something was added/freed while we were iterating.
      s.append("?");
    }
    return s.toString();
  }
}
private AtomicBoolean shutdown=new AtomicBoolean();无效关闭(){shutdown.set(true);}while(!shutdown.get()){if(source.hasNext()){对象项=source.next();while(!shutdown.get()&&!queue.offer(项目,100,时间单位.百万秒){持续}}其他{打破}}

最新更新