如何根据生产者线程状态停止使用者线程



我需要实现一个消费者生产者示例。 这是一个简单的程序,我修改了一点,但我不确定它是否存在潜在的问题。 如果有人能帮助我完善它,我将不胜感激。 我的主要现在的问题是,当生产者完成时,我不知道如何阻止消费者。

我已经尝试了以下代码,但是 stop() 已被弃用,它也不起作用:

    if (!producer.isAlive()) {
            consumer.stop();
    }

生产者消费者.java:

    import java.util.Vector;
    public class ProducerConsumer {
        public static void main(String[] args) {
            int size = 5;
            Vector<Integer> sQ = new Vector<Integer>(size);
            Thread consumer = new Thread(new Consumer(sQ, size));
            Thread producer = new Thread(new Producer(sQ, size));
            consumer.start();
            producer.start();
            if (!producer.isAlive()) {
                consumer.stop();
            }
        }
    }
    class Consumer implements Runnable {
        Vector<Integer> sQ = new Vector<Integer>();
        int size;
        public Consumer(Vector<Integer> sQ, int size) {
            this.sQ = sQ;
            this.size = size;
        }
        @Override
        public void run() {
            while (true) {
                try {
                    System.out.println("Consuming element: " + consume());;
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        private int consume() throws InterruptedException {
            synchronized (sQ) {
                while (sQ.isEmpty()) {
                    System.out.println("The queue is empty and "
                            + Thread.currentThread().getName() + " has to wait."
                            + "size is: " + sQ.size());
                    sQ.wait();
                }
                sQ.notifyAll();
                return sQ.remove(0);
            }
        }
    }
    class Producer implements Runnable {
        Vector<Integer> sQ = new Vector<Integer>();
        int size;
        public Producer(Vector<Integer> sQ, int size) {
            this.sQ = sQ;
            this.size = size;
        }
        @Override
        public void run() {
            for (int i = 0; i < 12; ++i) {
                try {
                    produce(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        private void produce(int i) throws InterruptedException {
            synchronized (sQ) {
                while (sQ.size() == size) {
                    System.out.println("The queue is full and "
                            + Thread.currentThread().getName() + " has to wait."
                            + "size is: " + sQ.size());
                    sQ.wait();
                }
                sQ.add(i);
                sQ.notify();
            }
        }
    }

推荐的方法通常是在需要终止的线程上设置一个布尔标志(finished或类似标志),然后循环while(!finished)。(请注意,通常需要volatile标志,以便线程看到更改。如果线程预计会阻塞,则可以interrupt()它以重新启动其等待循环。

但是,您采取的整体方法似乎已经过时了。BlockingQueue实现是专门为简化生产者-消费者实现而设计的,许多此类问题可以通过使用 Executor 并在任务进入时向其触发任务而不是手动排队和轮询来更有效地处理。

使用 CountdownLatch。这允许您等待它在一个线程中降低,然后实际上从另一个线程降低它。它是线程安全的,专为此用例而设计。

如果您打算使用布尔值,如其中一个公报中所建议的那样,请使用原子布尔值。

通常,应避免使用语言原语(如同步或易失性),而应使用 java.concurrent 包提供的更高级别的结构。如果你要去低层次,你需要对语义有一个深刻的理解。

如果你想重用而不是重新发明,你可能想使用我的并发处理可迭代:https://github.com/jillesvangurp/iterables-support/blob/master/src/main/java/com/jillesvangurp/iterables/ConcurrentProcessingIterable.java

您只需对输入进行搜索,它就会同时生成具有所需线程数的输出。

最新更新