生产者 - 消费者,有限的缓冲区插入误差



我正在尝试进行一项作业,该作业需要在有限的缓冲区中拥有多个生产者和消费者生产和消费随机整数。这是我的代码:

import java.util.Deque;
import java.util.LinkedList;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.Semaphore;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.nio.Buffer;
import java.util.concurrent.TimeUnit;
public class ProducerConsumer 
{   
    interface Buffer
    {
        public abstract void insert(int item);
        public abstract int remove();
    }
    static class BoundedBuffer implements Buffer
    {
        private static final int maxSize = 5;
        private Semaphore mutex;
        private Semaphore empty;
        private Semaphore full;
        private int count;
        private int in;
        private int out;
        private int[] buffer;
        //private ArrayList<Integer> buffer = new ArrayList<Integer>();
        public BoundedBuffer()
        {
            mutex = new Semaphore(1);
            empty = new Semaphore(5);
            full = new Semaphore(0);
            count = 0;
            in = 0;
            out = 0;
        }
        public synchronized int remove()
        {
            int  item = 0;
            while (count == 0)
            {
            }
            try{
                full.acquire();
                mutex.acquire();
            }catch (InterruptedException e) {
                System.out.println("REMOVAL ERROR: " + e);
            }
            count--;
            item = buffer[out];
            out = (out+1) % maxSize;
            System.out.println("      Consumer consumed: " + item);
            mutex.release();
            empty.release();
            return item;
        }
        public synchronized void insert(int item)
        {
            while (count == maxSize)
            {
            }
            try{
                empty.acquire();
                mutex.acquire();
            }catch (InterruptedException e) {
                System.out.println("INSERTION ERROR: " + e);
            }
            count++;
            buffer[in] = item;
            in = (in+1) % maxSize;
            System.out.println("Producer produced " + item);
            mutex.release();
            full.release();
        }
    }
    static class Producer implements Runnable
    {
        private Buffer buffer;
        public Producer(Buffer b)
        {
            buffer = b;
        }
        public void run()
        {
            Random proRand = new Random();
            Random sleepRand = new Random();
            for (int i = 0; i < 100; i++)
            {
                try{
                    Thread.sleep(sleepRand.nextInt((500 - 0) + 0));
                }catch (InterruptedException e) {
                    System.out.println("PRODUCER INTERRUPT: " + e);
                }
                buffer.insert(proRand.nextInt((99999 - 10000) + 10000));
            }
        }
    }
    static class Consumer implements Runnable
    {
        private Buffer buffer;
        public Consumer(Buffer b)
        {
            buffer = b;
        }
        public void run()
        {
            Random sleepRand = new Random();
            for (int i = 0; i < 100; i++)
            {
                try{
                    Thread.sleep(sleepRand.nextInt((500 - 0) + 0));
                }catch (InterruptedException e) {
                    System.out.println("CONSUMER INTERRUPT: " + e);
                }
                buffer.remove();
            }
        }
    }
    public static void main(String[] args)
    {
        Scanner scanner = new Scanner(System.in);
        int sleepTime = scanner.nextInt();
        int numPro = scanner.nextInt();
        int numCon = scanner.nextInt();
        scanner.close();
        System.out.println("Using arguments from command line");
        System.out.println("Sleep time = " + sleepTime);
        System.out.println("Producer threads = " + numPro);
        System.out.println("Consumer threads = " + numCon);
        System.out.println();
        Buffer shared = new BoundedBuffer();
        /* proThread = new Thread(new Producer(shared));
        Thread conThread = new Thread(new Consumer(shared));
        proThread.start();
        conThread.start();*/

        ExecutorService proPool = Executors.newFixedThreadPool(numPro);
        for (int i = 0; i < numPro; i++)
        {
            proPool.submit(new Producer(shared));
        }
        proPool.shutdown();
        ExecutorService conPool = Executors.newFixedThreadPool(numCon);
        for (int i = 0; i < numCon; i++)
        {
            conPool.submit(new Consumer(shared));
        }
        conPool.shutdown();
        try{
            if (!proPool.awaitTermination(20, TimeUnit.SECONDS))
            {
                proPool.shutdownNow();
            }
        }catch (InterruptedException e) {
            System.out.println("TERMINATION ERROR: " + e);
        }
        try{
            if (!conPool.awaitTermination(20, TimeUnit.SECONDS))
            {
                conPool.shutdownNow();
            }
        }catch (InterruptedException e) {
            System.out.println("TERMINATION ERROR: " + e);
        }
        /*for (int i = 0; i < numPro; i++)
        {
            Runnable produce = new Producer();
        }
        for (int i = 0; i < numCon; i++)
        {
            Runnable consume = new Consumer();
        }*/
        //Runnable produce = new Producer();
        //Runnable consume = new Consumer();
        //Thread pro = new Thread(produce, "pro");
        //Thread con = new Thread(consume, "con");
    }
}

现在,输入'20 5 1'后,我将获得以下输出:

" 20 5 1

使用命令行的参数

睡眠时间= 20

生产者线程= 5

消费者线程= 1

插入错误:java.lang.interruptedexception

插入错误:java.lang.interruptedexception

插入错误:java.lang.interruptedexception

删除错误:java.lang.interruptedException

插入错误:java.lang.interruptedexception"

我对造成这种情况有些困惑。我是否需要对我的有限缓冲区的不同数据结构?

我能够发现以下错误:

  • int []缓冲区永远不会初始化
  • 使用挥发性关键字作为"计数","one_answers" OUT"
  • 当您已经使用锁时,请勿使用"同步"
  • 不需要"完整"one_answers"空"信号量,将其删除
  • 由于您正在插入和删除100次,最高500ms的暂停,请在等待中使用至少50秒()

该程序的以下版本

public static class ProducerConsumer {
        interface Buffer {
            void insert(int item);
            int remove();
        }
        static class BoundedBuffer implements Buffer {
            private static final int maxSize = 5;
            private final int[] buffer = new int[maxSize];
            private final Semaphore mutex;
            private volatile int count;
            private volatile int in;
            private volatile int out;
            //private ArrayList<Integer> buffer = new ArrayList<Integer>();
            public BoundedBuffer() {
                mutex = new Semaphore(1);
                count = 0;
                in = 0;
                out = 0;
            }
            public int remove() {
                int item = 0;
                while (count == 0) {
                }
                try {
                    mutex.acquire();
                } catch (InterruptedException e) {
                    System.out.println("REMOVAL ERROR: " + e);
                }
                count--;
                item = buffer[out];
                out = (out + 1) % maxSize;
                System.out.println("Consumer consumed: " + item);
                mutex.release();
                return item;
            }
            public void insert(int item) {
                while (count == maxSize) {
                }
                try {
                    mutex.acquire();
                } catch (InterruptedException e) {
                    System.out.println("INSERTION ERROR: " + e);
                }
                count++;
                buffer[in] = item;
                in = (in + 1) % maxSize;
                System.out.println("Producer produced " + item);
                mutex.release();
            }
        }
        static class Producer implements Runnable {
            private Buffer buffer;
            public Producer(Buffer b) {
                buffer = b;
            }
            public void run() {
                Random proRand = new Random();
                Random sleepRand = new Random();
                for (int i = 0; i < 100; i++) {
                    try {
                        Thread.sleep(sleepRand.nextInt(500));
                    } catch (InterruptedException e) {
                        System.out.println("PRODUCER INTERRUPT: " + e);
                    }
                    try {
                        buffer.insert(proRand.nextInt((99999 - 10000) + 10000));
                    } catch (Exception e) {
                        System.out.println("Error while inserting " + e);
                    }
                }
            }
        }
        static class Consumer implements Runnable {
            private Buffer buffer;
            public Consumer(Buffer b) {
                buffer = b;
            }
            public void run() {
                Random sleepRand = new Random();
                for (int i = 0; i < 100; i++) {
                    try {
                        Thread.sleep(sleepRand.nextInt(500));
                    } catch (InterruptedException e) {
                        System.out.println("CONSUMER INTERRUPT: " + e);
                    }
                    try {
                        buffer.remove();
                    } catch (Exception e) {
                        System.out.println("Error while removing " + e);
                    }
                }
            }
        }
        public static void main(String[] args) {
            int sleepTime = 20;
            int numPro = 5;
            int numCon = 1;
            System.out.println("Using arguments from command line");
            System.out.println("Sleep time = " + sleepTime);
            System.out.println("Producer threads = " + numPro);
            System.out.println("Consumer threads = " + numCon);
            System.out.println();
            Buffer shared = new BoundedBuffer();
        /* proThread = new Thread(new Producer(shared));
        Thread conThread = new Thread(new Consumer(shared));
        proThread.start();
        conThread.start();*/
            ExecutorService proPool = Executors.newFixedThreadPool(numPro);
            for (int i = 0; i < numPro; i++) {
                proPool.submit(new Producer(shared));
            }
            proPool.shutdown();
            ExecutorService conPool = Executors.newFixedThreadPool(numCon);
            for (int i = 0; i < numCon; i++) {
                conPool.submit(new Consumer(shared));
            }
            conPool.shutdown();
            try {
                if (!proPool.awaitTermination(50, TimeUnit.SECONDS)) {
                    proPool.shutdownNow();
                }
            } catch (InterruptedException e) {
                System.out.println("TERMINATION ERROR: " + e);
            }
            try {
                if (!conPool.awaitTermination(50, TimeUnit.SECONDS)) {
                    conPool.shutdownNow();
                }
            } catch (InterruptedException e) {
                System.out.println("TERMINATION ERROR: " + e);
            }
        }
    }

最新更新