单项生产者-消费者.我可以在Java中使用哪个数据结构



我希望有一个生产者-消费者问题,只消耗最新的物品。这个问题可能有不同的名字,但我想不通!

生产者线程通过覆盖任何旧项目,以非阻塞的方式生成元素。单个使用者线程应该等待创建一个元素并使用它

我曾考虑过使用阻塞队列,但java实现不允许覆盖旧元素。循环缓冲区(类似于公共库)也不起作用,因为它不为消费者阻塞。

有没有数据结构可以达到这个目的,或者我需要找到更好的方法?

也可以用锁之类的低级别同步工具来解决这个问题,但我不知道如何做到

不需要特殊的数据结构。只需使用Object中可用的方法。在这种情况下,它们非常好,因为阻塞消费者:

class ItemHolder<T> {
    private T item;
    public synchronized void produce(T item) {
        this.item = item;
        notify();
    }
    public synchronized T consume() {
        while (item == null) {
            wait();
        }
        T result = item;
        item = null;
        return result;
    }
}
Java 中的高效循环缓冲区

如果您想对最近的数据窗口进行操作,覆盖循环缓冲区是很好的数据结构。元素是像队列一样添加和删除FIFO的,但在满缓冲区上添加会导致删除最旧的(队列头)元素。

import java.util.NoSuchElementException;  
 /**  
  * Thread safe fixed size circular buffer implementation. Backed by an array.  
  *   
  * @author brad  
  */  
 public class ArrayCircularBuffer<T> {  
      // internal data storage  
      private T[] data;  
      // indices for inserting and removing from queue  
      private int front = 0;  
      private int insertLocation = 0;  
      // number of elements in queue  
      private int size = 0;  
      /**  
       * Creates a circular buffer with the specified size.  
       *   
       * @param bufferSize  
       *      - the maximum size of the buffer  
       */  
      public ArrayCircularBuffer(int bufferSize) {  
           data = (T[]) new Object[bufferSize];  
      }  
      /**  
       * Inserts an item at the end of the queue. If the queue is full, the oldest  
       * value will be removed and head of the queue will become the second oldest  
       * value.  
       *   
       * @param item  
       *      - the item to be inserted  
       */  
      public synchronized void insert(T item) {  
           data[insertLocation] = item;  
           insertLocation = (insertLocation + 1) % data.length;  
           /**  
            * If the queue is full, this means we just overwrote the front of the  
            * queue. So increment the front location.  
            */  
           if (size == data.length) {  
                front = (front + 1) % data.length;  
           } else {  
                size++;  
           }  
      }  
      /**  
       * Returns the number of elements in the buffer  
       *   
       * @return int - the number of elements inside this buffer  
       */  
      public synchronized int size() {  
           return size;  
      }  
      /**  
       * Returns the head element of the queue.  
       *   
       * @return T  
       */  
      public synchronized T removeFront() {  
           if (size == 0) {  
                throw new NoSuchElementException();  
           }  
           T retValue = data[front];  
           front = (front + 1) % data.length;  
           size--;  
           return retValue;  
      }  
      /**  
       * Returns the head of the queue but does not remove it.  
       *   
       * @return  
       */  
      public synchronized T peekFront() {  
           if (size == 0) {  
                return null;  
           } else {  
                return data[front];  
           }  
      }  
      /**  
       * Returns the last element of the queue but does not remove it.  
       *   
       * @return T - the most recently added value  
       */  
      public synchronized T peekLast() {  
           if (size == 0) {  
                return null;  
           } else {  
                int lastElement = insertLocation - 1;  
                if (lastElement < 0) {  
                     lastElement = data.length - 1;  
                }  
                return data[lastElement];  
           }  
      }  
 }  

这里是循环有界队列,它(应该)是线程安全的,并提供阻塞take操作。

public class CircularQueue<T> {
    private final int MAX_SIZE;
    private final AtomicReferenceArray<T> buffer;
    private final AtomicInteger start;
    private final AtomicInteger end;
    private final AtomicInteger len;
    private final ReentrantLock rwlock;
    private final Condition readCondition;
    public CircularQueue(int size) {
        MAX_SIZE = size;
        buffer = new AtomicReferenceArray<T>(size);
        start = new AtomicInteger(0);
        end = new AtomicInteger(0);
        len = new AtomicInteger(0);
        rwlock = new ReentrantLock(true);
        readCondition =  rwlock.newCondition();
    }
    /** 
    * Adds to tail of the queue
    */
    public void put(T val) {
        try {
            rwlock.lock();
            buffer.set(end.get(), val);
            end.set((end.get() + 1) % MAX_SIZE);
            if (len.get() == MAX_SIZE) { // overwrite
                start.set((start.get() + 1) % MAX_SIZE);
            } else {
                len.incrementAndGet();
            }
            readCondition.signal();
        } finally {
            rwlock.unlock();
        }
    }
    /**
     * Blocking removeFront operation       
     * @return
     */
    public T take() {
        T val = null;
        try {
            rwlock.lock();
            while (len.get() == 0) {
                try {
                    readCondition.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            val = buffer.get(start.get());
            buffer.set(start.get(), null);
            start.set((start.get() + 1) % MAX_SIZE);
            len.decrementAndGet();
        } finally {
            rwlock.unlock();
        }
        return val;
    }
    public int size() {
        int curLen = 0;
        try {
            rwlock.lock();
            curLen = len.get();
        } finally {
            rwlock.unlock();
        }
        return curLen;
    }
}

有许多操作尚未添加,如polloffer等。但您可以使用一些线程来测试这一点:

如果JVM运行正确,它将挂起JVM。

public static void main(String[] args) {
    final int MAX_QUEUE_SIZE = 4;
    final CircularQueue<Integer> q = new CircularQueue<Integer>(MAX_QUEUE_SIZE);
    new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Putting: from " + Thread.currentThread().getName() + " " +  i);
                q.put(i);
            }
            for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
                System.out.println("Trying to get from " + Thread.currentThread().getName() + " " + q.take());
            }   
        }
    }).start();
    new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 10; i < 10 + MAX_QUEUE_SIZE; ++i) {
                try {
                    Thread.sleep(1001);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Putting: from " + Thread.currentThread().getName() + " " +  i);
                q.put(i);
            }
            for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
                System.out.println("Trying to get from " + Thread.currentThread().getName() + " " + q.take());
            }               
        }
    }).start();
}

您的输出可能与匹配

Putting: from Thread-0 0
Putting: from Thread-1 10
Putting: from Thread-0 1
Putting: from Thread-1 11
Putting: from Thread-0 2
Putting: from Thread-1 12
Putting: from Thread-0 3
Trying to get from Thread-0 11
Trying to get from Thread-0 2
Trying to get from Thread-0 12
Trying to get from Thread-0 3
Putting: from Thread-1 13
Trying to get from Thread-1 13

来自Thread-1的其他take操作正在等待相应的put操作,因为Thread-1比Thread-0稍慢。

Java为此提供的最简单的解决方案是:

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor()

每个文档:"创建一个执行器,该执行器使用在无边界队列中操作的单个工作线程,并在需要时使用提供的ThreadFactory创建一个新线程"

最新更新