Java通过多线程共享对象——需要设计模式



我想得到一些关于我正在设计的简单多线程系统的建议。

:应用程序正在捕获帧并在第一个imageview中显示它们。这些捕获的帧也被处理(通过MyHandDetectionThread),然后显示在第二个imageview中。

我的解决方案:

public class VideoManager {
    private volatile BufferLinkedList<InputFrame> mInputFrames;
    private volatile BufferLinkedList<ProcessedFrame> mProcessedFrames;
    private static VideoManager mVideoManagerInstance = new VideoManager();
    private Timer captureTimer;
    private MyVideoCaptureThread myVideoCaptureThread;
    private MyFrameDisplayThread myFrameDisplayThread;
    private MyHandDetectionThread myHandDetectionThread;
    private MyProcessedFrameDisplayThread myProcessedFrameDisplayThread;
    private enum ThreadMessages {
        PROCESS_INPUT_FRAME,
        NEW_INPUT_FRAME,
        NEW_PROCESSED_FRAME_ARRIVED,
        GET_NEW_FRAME
    }
    public static VideoManager getInstance() {
        if (mVideoManagerInstance == null) {
            mVideoManagerInstance = new VideoManager();
        }
        return mVideoManagerInstance;
    }
    // not visible constructor - for singleton purposes
    private VideoManager() {
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
    }
    public void startDetectionAndRecognition(ImageView camIV, ImageView handIV) {
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        captureTimer = new Timer();
        myVideoCaptureThread = new MyVideoCaptureThread();
        myFrameDisplayThread = new MyFrameDisplayThread(camIV, handIV);
        myHandDetectionThread = new MyHandDetectionThread();
        myProcessedFrameDisplayThread = new MyProcessedFrameDisplayThread();
        captureTimer.schedule(new TimerTask() {
            public void run() {
                if (myVideoCaptureThread != null && myVideoCaptureThread.threadMessages != null)
                    myVideoCaptureThread.threadMessages.offer(ThreadMessages.GET_NEW_FRAME);
            }
        }, 0, 1000 / Config.fps);
        myFrameDisplayThread.start();
        myVideoCaptureThread.start();
        myHandDetectionThread.start();
        myProcessedFrameDisplayThread.start();
    }
    public void stop() {
        captureTimer.cancel();
        myVideoCaptureThread.interrupt();
        myHandDetectionThread.interrupt();
        myFrameDisplayThread.interrupt();
        myGestureRecogitionThread.interrupt();
        mInputFrames.removeAll(mInputFrames);
        mProcessedFrames.removeAll(mProcessedFrames);
        isActive = false;
    }
    public boolean isActive() {
        return isActive;
    }
    ////////////////////////
    // Thread clases
    ////////////////////////
    private class MyVideoCaptureThread extends Thread {
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128);
        @Override
        public void run() {
            WebCamVideoCapture vc = new WebCamVideoCapture();
            while (!isInterrupted()) {
                if (threadMessages != null && threadMessages.poll() == ThreadMessages.GET_NEW_FRAME) {
                    Mat mat = vc.getNextMatFrame();
                    if (mat != null && mInputFrames != null) {
                        mInputFrames.offerFirst(new InputFrame(mat));
                        if (myFrameDisplayThread != null && myFrameDisplayThread.threadMessages != null)
                            myFrameDisplayThread.threadMessages.offer(ThreadMessages.NEW_INPUT_FRAME);
                        if (myHandDetectionThread != null && myHandDetectionThread.threadMessages != null)
                            myHandDetectionThread.threadMessages.offer(ThreadMessages.PROCESS_INPUT_FRAME);
                    }
                }
            }
            vc.close();
        }
    }
    private class MyFrameDisplayThread extends Thread {
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128);
        ImageView mCamImageView;

        long lastUpdatedCamImageViewMillis;
        long lastUpdatedHandImageViewMillis;
        public MyFrameDisplayThread(ImageView mImageView) {
            this.mCamImageView = mImageView;
        }
        private synchronized void updateImageViews() {
            if (threadMessages.poll() == ThreadMessages.NEW_INPUT_FRAME && mInputFrames != null && !mInputFrames.isEmpty() && mInputFrames.peek() != null && mInputFrames.peek().getFrame() != null) {
                if(Config.IS_DEBUG) System.out.println("Updating frame image view");
                mCamImageView.setImage(Utils.cvMatToImage(mInputFrames.peekFirst().getFrame()));
            } 
        }
        @Override
        public void run() {
            while (!isInterrupted()) {
                updateImageViews();
            }
        }
    }
    private class MyHandDetectionThread extends Thread {
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128); //TODO if multiple threads, define it out of class
        HandDetector hd = new HandDetector();
        @Override
        public void run() {
            while (!isInterrupted()) {
                if (threadMessages.poll() == ThreadMessages.PROCESS_INPUT_FRAME && mInputFrames != null && mInputFrames.size() > 0 && mInputFrames.peek() != null) {
                    if(Config.IS_DEBUG) System.out.println("Detecting hand...");
                    mProcessedFrames.offerFirst(new ProcessedFrame(hd.detectHand(mInputFrames.peek()), null, null, null));
                    if (myGestureRecogitionThread != null && myGestureRecogitionThread.threadMessages != null)
                        myGestureRecogitionThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);
                    if(myFrameDisplayThread != null && myFrameDisplayThread.threadMessages != null)
                        myFrameDisplayThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);
                }
            }
        }
    }
    private class MyProcessedFrameDisplayThread extends Thread {
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128);
        ImageView mHandImageView;
        public MyProcessedFrameDisplayThread(ImageView mHandImageView) {
            mHandImageView = mHandImageView;
        }
        private synchronized void updateImageViews() {
            if(threadMessages.poll() == ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED && mProcessedFrames != null && !mProcessedFrames.isEmpty() && mProcessedFrames.peek() != null && mProcessedFrames.peek().getmHandMask() != null) {
                if(Config.IS_DEBUG) System.out.println("Updating hand image view");
                mHandImageView.setImage(Utils.cvMatToImage(mProcessedFrames.peekFirst().getmHandMask()));
            }
        }
        @Override
        public void run() {
            while (!isInterrupted())
                if (threadMessages.poll() == ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED)
                    updateImageViews();
        }
    }
}

public class BufferLinkedList<E> extends LinkedList<E> {
    private int counter = 0;
    private int sizeLimit = 48;
    public BufferLinkedList(int sizeLimit) {
        this.sizeLimit = sizeLimit;
    }
    @Override
    public synchronized boolean offerFirst(E e) {
        while(size() > sizeLimit) {
            removeLast();
        }
        return super.offerFirst(e);
    }
    @Override
    public synchronized E peekFirst() {
        return super.peekFirst();
    }
    @Override
    public synchronized E peekLast() {
        return super.peekLast();
    }
    @Override
    public synchronized E pollFirst() {
        return super.pollFirst();
    }
    @Override
    public synchronized E pollLast() {
        return super.pollLast();
    }
}

我的问题:帧显示不流畅。在更新imageview的方法之间有一个不规则的,1-5秒的间隔。然而,MyHandDetectionThread的任务运行得非常快。显示线程的消息队列的大小也在快速增长。也许这是因为存储帧的列表上有一些锁?

:我的解决方法正确吗?是否有一些设计模式描述这种场景?你有什么改进的建议吗?

编辑:

我在线程循环中添加了等待和通知。结果令人满意。CPU成本现在是30%,而以前是80%。一切运行更稳定,更顺畅。然而,我不熟悉等待和通知的方法。所以如果你在我的代码中发现了什么愚蠢的东西,请告诉我。

public class VideoManager {
    private volatile BufferLinkedList<InputFrame> mInputFrames;
    private volatile BufferLinkedList<ProcessedFrame> mProcessedFrames;
    private static VideoManager mVideoManagerInstance = new VideoManager();
    private Timer captureTimer;
    private MyVideoCaptureThread myVideoCaptureThread;
    private MyFrameDisplayThread myFrameDisplayThread;
    private MyHandDetectionThread myHandDetectionThread;
    private MyGestureRecogitionThread myGestureRecogitionThread;
    private MySkinDisplayThread mySkinDisplayThread;
    private final static int THREAD_MESSAGES_LIMIT = 10000;
    private final static int TIMER_INTERVAL = 1000 / Config.fps;
    private final static int WAITING_TIMEOUT = 2000;
    private enum ThreadMessages {
        PROCESS_INPUT_FRAME,
        NEW_INPUT_FRAME,
        NEW_PROCESSED_FRAME_ARRIVED,
        GET_NEW_FRAME
    }
    public static VideoManager getInstance() {
        if (mVideoManagerInstance == null) {
            mVideoManagerInstance = new VideoManager();
        }
        return mVideoManagerInstance;
    }
    // not visible constructor - for singleton purposes
    private VideoManager() {
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
    }
    public void startDetectionAndRecognition(ImageView camIV, ImageView handIV) {
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        captureTimer = new Timer();
        myFrameDisplayThread = new MyFrameDisplayThread(camIV);
        myVideoCaptureThread = new MyVideoCaptureThread();
        myHandDetectionThread = new MyHandDetectionThread();
        myGestureRecogitionThread = new MyGestureRecogitionThread();
        mySkinDisplayThread = new MySkinDisplayThread(handIV);
        myVideoCaptureThread.start();
        captureTimer.schedule(new TimerTask() {
            public void run() {
                if (myVideoCaptureThread != null && myVideoCaptureThread.threadMessages != null) {
                    myVideoCaptureThread.threadMessages.offer(ThreadMessages.GET_NEW_FRAME);
                    System.out.println("Timer get frame request sent");
                    myVideoCaptureThread.wakeUp();
                }
            }
        }, 0, TIMER_INTERVAL);
        myFrameDisplayThread.start();
        mySkinDisplayThread.start();
        myHandDetectionThread.start();
        myGestureRecogitionThread.start();
    }
    public void stop() {
        captureTimer.cancel();
        myVideoCaptureThread.interrupt();
        myHandDetectionThread.interrupt();
        mySkinDisplayThread.interrupt();
        myFrameDisplayThread.interrupt();
        myGestureRecogitionThread.interrupt();
        mInputFrames.removeAll(mInputFrames);
        mProcessedFrames.removeAll(mProcessedFrames);
    }
    ////////////////////////
    // Lock class
    ////////////////////////
    private static final class Lock {}
    ////////////////////////
    // Thread clases
    ////////////////////////
    private class MyVideoCaptureThread extends Thread {
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        WebCamVideoCapture vc = new WebCamVideoCapture();
        Lock lock = new Lock();
        @Override
        public void run() {
            synchronized (lock) {
                while (!isInterrupted()) {
                    if (threadMessages.poll() != ThreadMessages.GET_NEW_FRAME) {
                        try {
                            lock.wait(WAITING_TIMEOUT);
                            System.out.println("WideoCaptureThread waiting");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Mat mat = vc.getNextMatFrame();
                    System.out.println("getting next frame from webcam");
                    if (mat != null && mInputFrames != null) {
                        mInputFrames.offerFirst(new InputFrame(vc.getNextMatFrame()));
                        if (myHandDetectionThread != null && myHandDetectionThread.threadMessages != null) {
                            myHandDetectionThread.wakeUp();
                            myHandDetectionThread.threadMessages.offer(ThreadMessages.PROCESS_INPUT_FRAME);
                        }
                        if (myFrameDisplayThread != null && myFrameDisplayThread.threadMessages != null) {
                            myFrameDisplayThread.wakeUp();
                            myFrameDisplayThread.threadMessages.offer(ThreadMessages.NEW_INPUT_FRAME);
                        }
                    }
                }
            }
        }
        public void wakeUp() {
            synchronized (lock) {
                lock.notifyAll();
                System.out.println("Waking up WideoCapture");
            }
        }
        @Override
        public void interrupt() {
            vc.close();
            super.interrupt();
        }
    }
    private class MyFrameDisplayThread extends Thread {
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        Lock lock = new Lock();
        ImageView mCamImageView;
        public MyFrameDisplayThread(ImageView mImageView) {
            this.mCamImageView = mImageView;
        }
        private void updateImageViews() {
            if (shouldUpdateCamImageView() && mInputFrames != null && !mInputFrames.isEmpty() && mInputFrames.peek() != null && mInputFrames.peek().getFrame() != null) {
                System.out.println("Updating frame image view");
                mCamImageView.setImage(Utils.cvMatToImage(mInputFrames.peekFirst().getFrame()));
                threadMessages.poll();
            }
        }
        @Override
        public void run() {
            synchronized (lock) {
                while (!isInterrupted()) {
                    if (threadMessages.peek() != ThreadMessages.NEW_INPUT_FRAME) {
                        try {
                            lock.wait(WAITING_TIMEOUT);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    updateImageViews();
                }
            }
        }
        public void wakeUp() {
            synchronized (lock) {
                lock.notifyAll();
                System.out.println("Waking up FrameDisplay");
            }
        }
        private boolean shouldUpdateCamImageView() {
            if (!Config.CAPTURE_PREVIEW_MODE) return false;
            return true;
        }
    }
    private class MySkinDisplayThread extends Thread {
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        ImageView mHandImageView;
        Object lock = new Lock();
        public MySkinDisplayThread(ImageView mHandImageView) {
            this.mHandImageView = mHandImageView;
        }
        private synchronized void updateHandImageView() {
            if (shouldUpdateHandImageView() && mProcessedFrames != null && !mProcessedFrames.isEmpty() && mProcessedFrames.peek() != null && mProcessedFrames.peek().getmHandMask() != null) {
                System.out.println("Updating skin image view");
                mHandImageView.setImage(Utils.cvMatToImage(mProcessedFrames.peekFirst().getmHandMask()));
                threadMessages.poll();
            }
        }
        @Override
        public void run() {
            synchronized (lock) {
                while (!isInterrupted()) {
                    if (threadMessages.peek() != ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED) {
                        try {
                            lock.wait(WAITING_TIMEOUT);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    updateHandImageView();
                }
            }
        }
        private boolean shouldUpdateHandImageView() {
            if (!Config.SKIN_MASK_PREVIEW_MODE) return false;
            return true;
//            long now = System.currentTimeMillis();
//            boolean should = now - lastUpdatedHandImageViewMillis > TIMER_INTERVAL;
//            lastUpdatedHandImageViewMillis = now;
//            return should;
        }
        public void wakeUp() {
            synchronized (lock) {
                lock.notifyAll();
                System.out.println("Waking up FrameDisplay");
            }
        }
    }
    private class MyHandDetectionThread extends Thread {
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT); //TODO if multiple threads, define it out of class
        HandDetector hd = new HandDetector();
        Object lock = new Lock();
        @Override
        public void run() {
            synchronized (lock) {
                while (!isInterrupted()) {
                    if (threadMessages.poll() != ThreadMessages.PROCESS_INPUT_FRAME) {
                        try {
                            lock.wait(WAITING_TIMEOUT);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if (mInputFrames != null /*&& mInputFrames.size() > 0 && mInputFrames.peek() != null && !mInputFrames.peek().getIsProcessed()*/) {
                        System.out.println("Detecting hand...");
//                    Mat handMask = hd.detectHand(mInputFrames.peek());
//                    int[][] fingerCoordinates = new int[5][2];
//                    int[] convDefects = new int[5];
//                    int[] handCenterCoordinates = new int[2];
                        mProcessedFrames.offerFirst(new ProcessedFrame(hd.detectHand(mInputFrames.peek()), null, null, null));
                        if (myGestureRecogitionThread != null && myGestureRecogitionThread.threadMessages != null) {
                            myGestureRecogitionThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);
                            mySkinDisplayThread.wakeUp();
                        }
                        if (mySkinDisplayThread != null && mySkinDisplayThread.threadMessages != null) {
                            mySkinDisplayThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);
                            mySkinDisplayThread.wakeUp();
                        }
                    }
                }
            }
        }
        public void wakeUp() {
            synchronized (lock) {
                lock.notifyAll();
                System.out.println("Waking up hand Detection");
            }
        }
    }
    private class MyGestureRecogitionThread extends Thread {
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        GestureRecognizer r = new GestureRecognizer();
        Lock lock = new Lock();
        @Override
        public void run() {
            synchronized (lock) {
                while (!isInterrupted()) {
                    if (threadMessages.poll() != ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED) {
                        try {
                            lock.wait(WAITING_TIMEOUT);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        r.lookForGestures(mProcessedFrames);
                    }
                }
            }
        }
        public void wakeUp() {
            synchronized (lock) {
                lock.notifyAll();
                System.out.println("Waking up hand Detection");
            }
        }
    }
} 

两个线程似乎都在它们的run()方法中使用轮询;也就是说,它们不断地循环检查布尔条件的语句。这可能不利于CPU使用,因为单个线程可以锁定CPU,而不会给其他线程任何周期;它最终会占用CPU,即使它没有做任何太有用的事情;只是没有满足布尔条件。

你应该使用异步方法与线程通信;与其使用轮询机制,不如在不需要线程进行任何处理时将其置于睡眠状态,并在需要线程时将其唤醒。这允许线程放弃 CPU,这意味着它们愿意放弃它们的活动上下文,以便其他线程可以执行。

最新更新