我实施生产者-消费者问题的方法是否正确



我已经使用等待/通知组合实现了生产者问题。有人可以让我知道我对生产者消费者问题的理解是否正确,以及我的实施是否正确/优化?

现在我在想如何使用ExecutorServiceCountDownLatchReentrantLockCyclicBarrier来实现同样的问题?有什么办法吗?同时,我将尝试查看是否可以使用闩锁实现问题解决方案。

import java.util.ArrayList;
import java.util.EmptyStackException;
import java.util.Random;
public class ProducerConsumerProblem {
    private Object syncher = new Object();
    private volatile ArrayList<Integer> sharedBuffer = new ArrayList<Integer>();
    public static void main(String[] args) {
        ProducerConsumerProblem object = new ProducerConsumerProblem();
        Thread producerThread = new Thread(() -> {
            object.produceData();
        },"Producer");
        Thread consumerThread = new Thread(() -> {
            object.consumeData();
        },"Consumer");
        producerThread.start();
        consumerThread.start();
    }
    public void produceData() {
        Random randomNumber = new Random();
        while(true) {
            synchronized (syncher) {
                if(sharedBuffer.size() == 1) {
                    try {
                        //System.out.println("Producer waiting...");
                        syncher.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Integer producedElem = randomNumber.nextInt(10);
                System.out.println("+++ Produced: "+producedElem);
                sharedBuffer.add(producedElem);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                syncher.notify();
            }
        }
    }
    public void consumeData() {
        while(true) {
            synchronized (syncher) {
                while(sharedBuffer.size() == 0) {
                    try {
                        //System.out.println("Consumer waiting...");
                        syncher.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Integer consumedElem = sharedBuffer.stream().findAny().orElseThrow(()-> new EmptyStackException());
                System.out.println("--- Consumed: "+consumedElem);
                sharedBuffer.remove(consumedElem);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                syncher.notify();
            }
        }
    }
}
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerProblemUsingBlockingQueue {
    BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(1);
    public static void main(String[] args) {
        ProducerConsumerProblemUsingBlockingQueue object = new ProducerConsumerProblemUsingBlockingQueue();
        Thread producerThread = new Thread(() -> {
            object.produceData(object.blockingQueue);
        },"Producer");
        Thread consumerThread = new Thread(() -> {
            object.consumeData(object.blockingQueue);
        },"Consumer");
        consumerThread.start();
        producerThread.start();
    }
    private void consumeData(BlockingQueue<Integer> blockingQueue) {
        for(int i = 0; i < 10; i++) {
            try {
                System.out.println("Consumed: "+blockingQueue.take().intValue());
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    private void produceData(BlockingQueue<Integer> blockingQueue) {
        Random randomObject = new Random();
        for(int i = 0; i < 10; i++) {
            try {
                int randomNumber = randomObject.nextInt(100);
                System.out.println("Produced: "+randomNumber);
                blockingQueue.put(randomNumber);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

最新更新