Java:同步自定义集合类



我想有一个共享的集合类,它由生产者线程填充,输出由消费者线程显示。它有时处理集合类的0元素,但永远不会更进一步。在Eclipse I中,应用程序冻结后观察"DestroyJVM"线程。在生产者上有人工延迟来模拟"慢"的生产者。我不知道为什么应用程序不能按顺序工作,比如"生产者在集合类上获取锁,添加Integer,消费者等待,生产者释放锁,消费者获取锁,消费者打印,消费者释放锁,生产者获取…"等等。谁能指出错误在哪里吗?

下面是我的代码:
import java.util.ArrayList;
import java.util.List;
import static java.lang.System.out;
public class SyncOwnCollMain {
    public static void main(String[] args) {
        SharedIntegers ints = new SharedIntegers();
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        producer.setInts(ints);
        consumer.setInts(ints);
        Thread producerThread = new Thread(producer);
        producerThread.setName("ProducerThread");
        Thread consumerThread = new Thread(consumer);
        consumerThread.setName("ConsumerThread");
        producerThread.start();
        consumerThread.start();
    }
}
class SharedIntegers {
    private final List<Integer> ints = new ArrayList<Integer>();
    private final int max = 100;
    public synchronized void addAtPosition(int i, Integer integer) {
            ints.add(i, integer);
    }
    public synchronized Integer getAtPosition(int i) {
            return ints.get(i);
    }
    public synchronized Integer removeAtPosition(int i) {
            return ints.remove(i);
    }
    public synchronized Integer getSize() {
            return ints.size();
    }
    public synchronized boolean isFinished() {
            return max < ints.size();
    }
}
class Producer implements Runnable {
    private SharedIntegers ints;
    private int timeout = 100;
    public SharedIntegers getInts() {
            return ints;
    }
    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }
    @Override
    public void run() {
        out.println("Started ProducerThread");
        if (getInts() != null) {
            int i = 0;
            Integer integer = null;
            while (!getInts().isFinished()) {
                synchronized (getInts()) {
                    integer = i * 3;
                    getInts().addAtPosition(i, integer);
                    out.print("Producer added new integer = " + integer + " at " + i + " position");
                    out.println(". Will sleep now for " + timeout + " ms");
                    try {
                        Thread.sleep(timeout);
                        getInts().wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
            }
        }
    }
}
class Consumer implements Runnable {
    private SharedIntegers ints;
    public SharedIntegers getInts() {
            return ints;
    }
    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }
    @Override
    public void run() {
        out.println("Started ConsumerThread");
        if (getInts() != null && getInts().getSize() > 0) {
            int i = 0;
            while (!getInts().isFinished()) {
                synchronized (getInts()) {
                    showAtPosition(i, getInts());
                    i++;
                    try {
                        getInts().wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Thread.yield();
                }
            }
        } else {
            Thread.yield();
        }
    }
    private void showAtPosition(int position, SharedIntegers ints) {
            out.println("sharedInts[" + position + "] -> " + ints.getAtPosition(position));
    }
}

编辑:我设法重写了代码,使它能以期望的方式工作,然而,producerThread和consumerThread不能优雅地退出。知道为什么吗?

import java.util.ArrayList;
import java.util.List;
import static java.lang.System.out;
public class SyncOwnCollMain {
    public static void main(String[] args) {
        out.println("Main application started");
        SharedIntegers ints = new SharedIntegers();
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        producer.setInts(ints);
        consumer.setInts(ints);
        Thread producerThread = new Thread(producer);
        producerThread.setName("ProducerThread");
        Thread consumerThread = new Thread(consumer);
        consumerThread.setName("ConsumerThread");
        consumerThread.start();
        try {
            Thread.sleep(1000); // simulate that consumerThread is "anxious" to start
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producerThread.start();
        try {
            consumerThread.join(); //let consumerThread finish before main()
            producerThread.join(); //let producerThread finish before main()
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        out.println("Main application finished");
    }
}
class SharedIntegers {
    private final List<Integer> ints = new ArrayList<Integer>();
    private final int max = 5;
    public synchronized void addAtPosition(int i, Integer integer) {
            ints.add(i, integer);
    }
    public synchronized Integer getAtPosition(int i) {
            return ints.get(i);
    }
    public synchronized Integer removeAtPosition(int i) {
            return ints.remove(i);
    }
    public synchronized Integer getSize() {
            return ints.size();
    }
    public synchronized boolean isFinished() {
            return max <= ints.size();
    }
    public synchronized boolean overflow(int i) {
            return i >= max;
    }
}
class Producer implements Runnable {
    private SharedIntegers ints;
    private final int timeout = 500;
    public SharedIntegers getInts() {
            return ints;
    }
    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }
    @Override
    public void run() {
        out.println("Started ProducerThread");
        if (getInts() != null) {
            int i = 0;
            Integer integer = null;
            synchronized (getInts()) {
                while (!getInts().isFinished()) {
                    integer = i * 3;
                    getInts().addAtPosition(i, integer);
                    out.print("Producer added new integer = " + integer + " at " + i + " position");
                    out.println(". Will sleep now for " + timeout + " ms");
                    try {
                        getInts().notify();
                        getInts().wait();
                        Thread.sleep(timeout); // simulate "slow" producer
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
                try {
                    getInts().wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        out.println("Finished ProducerThread");
    }
}
class Consumer implements Runnable {
    private SharedIntegers ints;
    public SharedIntegers getInts() {
            return ints;
    }
    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }
    @Override
    public void run() {
        out.println("Started ConsumerThread");
        if (getInts() != null) {
            synchronized (getInts()) {
                int i = 0;
                while (!getInts().overflow(i)) {
                    if (getInts().getSize() > 0) {
                        showAtPosition(i, getInts());
                        i++;
                    }
                    try {
                        getInts().notify();
                        getInts().wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        out.println("Finished ConsumerThread");
    }
    private void showAtPosition(int position, SharedIntegers ints) {
            out.println("sharedInts[" + position + "] -> " + ints.getAtPosition(position));
    }
}

编辑2:找到解决方案:需要从producerThread通知consumerThread, getInts()锁可以重新获得。带有我的注释的工作代码看起来像这样(通过consumerThread添加了一些数据修改):

import java.util.ArrayList;
import java.util.List;
import static java.lang.System.out;
public class SyncOwnCollMain {
    public static void main(String[] args) {
        out.println("Main application started");
        SharedIntegers ints = new SharedIntegers();
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        producer.setInts(ints);
        consumer.setInts(ints);
        Thread producerThread = new Thread(producer);
        producerThread.setName("ProducerThread");
        Thread consumerThread = new Thread(consumer);
        consumerThread.setName("ConsumerThread");
        consumerThread.start();
        try {
            Thread.sleep(1000); // simulate that consumerThread is "anxious" to start
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producerThread.start();
        try {
            consumerThread.join(); //let consumerThread finish before main()
            producerThread.join(); //let producerThread finish before main()
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        out.println("Main application finished"); // here, main() thread has result produced by producerThread and consumerThread
    }
}
class SharedIntegers {
    private final List<Integer> ints = new ArrayList<Integer>();
    private final int max = 5;
    public synchronized void addAtPosition(int i, Integer integer) {
            ints.add(i, integer);
    }
    public synchronized Integer getAtPosition(int i) {
            return ints.get(i);
    }
    public synchronized Integer removeAtPosition(int i) {
            return ints.remove(i);
    }
    public synchronized Integer getSize() {
            return ints.size();
    }
    public synchronized boolean isFinished() {
            return max <= ints.size();
    }
    public synchronized boolean overflow(int i) {
            return i >= max;
    }
}
class Producer implements Runnable {
    private SharedIntegers ints;
    private final int timeout = 500;
    public SharedIntegers getInts() {
            return ints;
    }
    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }
    @Override
    public void run() {
        out.println("Started ProducerThread");
        if (getInts() != null) {
            int i = 0;
            Integer integer = null;
            synchronized (getInts()) {
                while (!getInts().isFinished()) {
                    integer = i * 3;
                    getInts().addAtPosition(i, integer);
                    out.print("Producer added new integer = " + integer + " at " + i + " position");
                    out.println(". Will sleep now for " + timeout + " ms");
                    try {
                        getInts().notifyAll(); // notify all threads (in this case - consumer thread) that getInts() will be available for other threads to sync and other threads are legitimate to compete for locking getInts()
                        getInts().wait(); // release lock for getInts()
                        Thread.sleep(timeout); // simulate "slow" producer
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
                out.println("Finished ProducerThread while() loop");
                getInts().notifyAll(); // after job is done, need to notify consumer thread that it can compete to obtain getInts() lock
            }
        }   
    }
}
class Consumer implements Runnable {
    private SharedIntegers ints;
    public SharedIntegers getInts() {
            return ints;
    }
    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }
    @Override
    public void run() {
        out.println("Started ConsumerThread");
        if (getInts() != null) {
            int i = 0;
            synchronized (getInts()) {
                while (!getInts().overflow(i)) {
                    if (getInts().getSize() > 0) {
                        out.println(showAtPosition(i, getInts()));
                        increaseAtPosition(i, getInts());
                        out.println("After consumer increase : " + showAtPosition(i, getInts()));
                        i++;
                    }
                    try {
                        getInts().notifyAll(); // notify all threads that other threads are legitimate to compete for getInts() lock
                        getInts().wait(); // release getInts() lock, wait for allowance notification
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                out.println("Finished ConsumerThread while() loop");
            }
        }
    }
    private String showAtPosition(int position, SharedIntegers ints) {
            return "sharedInts[" + position + "] -> " + ints.getAtPosition(position);
    }
    private void increaseAtPosition(int position, SharedIntegers ints) {
        Integer increased = ints.getAtPosition(position)+1;
        ints.removeAtPosition(position);
        ints.addAtPosition(position, increased);
    }
}

getInts().wait();的调用导致每个线程永远等待,因为您从未调用notify(),因此您的应用程序冻结。java.lang.Object.wait()java.lang.Object.notify()参见Javadoc

Producer内部,更改

getInts().wait()

getInts().notify()

最新更新