具有最小和最大阈值设置的池填充服务



我想使应用程序有一个池(无限)的一些对象。当池的大小达到某个最小阈值时,我想启动我的池填充服务并产生一定数量的新对象(直到达到最大阈值)。

这是producer-consumer问题的稍微修改版本,但不知何故我被卡住了。由于很容易创建有限大小的BlockingQueue并保持填充,我不知道如何解决我的问题。

我尝试使用ReentrantLockCondition对象,但Condition#signal()方法需要我在内部锁定,这在我的情况下是完全不必要的。

在我看来,最好的解决方案是CountDownLatch之类的东西。消费者将减少计数器并最终触发池填充服务。这里的问题是CountDownLatch无法重新启动自己。

任何想法?

换句话说:我有一堆消费者线程和一个生产者线程。生产者应该等待,直到达到最小阈值,生产一些对象,然后再次等待。

Semaphore可以作为生产者的障碍,并且是可重用的。当SemaphoreAtomicBoolean组合时,生产者可以在不影响消费者的情况下工作。它确实需要池来处理填充逻辑。
在下面的实现中,生产者立即开始填充池,然后等待池达到最小大小。

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
// http://stackoverflow.com/q/32358084/3080094
public class RandomWell {
    public static void main(String[] args) {
        try {
            final FilledPool<Integer> pool = new FilledPool<Integer>(100, 1000);
            final CountDownLatch syncStart = new CountDownLatch(3);
            Thread consumer = new Thread() {
                @Override public void run() {
                    // just to do something, keep track of amount of positive ints from pool
                    int positiveInt = 0;
                    int totalInts = 0;
                    try {
                        syncStart.countDown();
                        syncStart.await();
                        for(;;) {
                            int i = pool.take();
                            if (i > 0) {
                                positiveInt++;
                            }
                            totalInts++;
                            Thread.yield();
                        }
                    } catch (InterruptedException e) {
                        System.out.println("Consumer stopped: " + positiveInt + " / " + (totalInts - positiveInt));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            consumer.start();
            Thread producer = new Thread() {
                @Override public void run() {
                    try {
                        Random r = new Random();
                        syncStart.countDown();
                        syncStart.await();
                        for(;;) {
                            int fillTotal = 0;
                            while (!pool.isMinFilled()) {
                                int fill = pool.getFillSize();
                                for (int i = 0; i < fill; i++) {
                                    pool.offer(r.nextInt());
                                }
                                fillTotal += fill;
                                // System.out.println("Pool size: " + pool.sizeFast());
                            }
                            System.out.println("Filled " + fillTotal);
                            pool.awaitNewFilling();
                        }
                    } catch (InterruptedException e) {
                        System.out.println("Producer stopped.");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            producer.start();
            syncStart.countDown();
            syncStart.await();
            Thread.sleep(100);
            producer.interrupt();
            consumer.interrupt();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    static class FilledPool<E> {
        private final LinkedBlockingQueue<E> pool;
        private final int minSize;
        private final int maxSize;
        private final Semaphore needFilling = new Semaphore(0);
        // producer starts filling initially
        private final AtomicBoolean filling = new AtomicBoolean(true);
        public FilledPool(int minSize, int maxSize) {
            super();
            this.minSize = minSize;
            this.maxSize = maxSize;
            pool = new LinkedBlockingQueue<E>();
        }
        public E take() throws InterruptedException {
            triggerFilling();
            E e = pool.take();
            return e;
        }
        private void triggerFilling() {
            if (!isFilling() && !isMinFilled() && filling.compareAndSet(false, true)) {
                needFilling.release();
                System.out.println("Filling triggered.");
            }
        }
        public void offer(E e) { pool.offer(e); }
        public void awaitNewFilling() throws InterruptedException {
            // must check on minimum in case consumers outpace producer
            if (isMinFilled()) {
                filling.set(false);
                needFilling.acquire();
            }
        }
        public int size() { return pool.size(); }
        public boolean isMinFilled() { return minSize < size(); }
        public int getFillSize() { return maxSize - size(); } 
        public boolean isFilling() { return filling.get(); }
    }
}

更新:我还设法使用ConcurrentLinkedQueue而不是LinkedBlockingQueue使其工作,这使吞吐量增加了一倍,但同时使代码的复杂性增加了一倍。

这是一种方法。它使用ReentrantLock.tryLock()来确保只有一个线程在填充,并且只有在低于阈值时才会填充。

这里的示例代码将运行30秒,然后停止。

package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Filler {
    public static void main(String... args) throws InterruptedException {
        final LinkedBlockingDeque<Object> linkedBlockingDeque = new LinkedBlockingDeque<>();
        final ExecutorService executorService = Executors.newFixedThreadPool(4);
        final Lock fillLock = new ReentrantLock();
        final AtomicBoolean stop = new AtomicBoolean(false);
        for (int i = 0; i < 4; i++) {
            executorService.execute(new Worker(linkedBlockingDeque, fillLock, stop));
        }
        Thread.sleep(TimeUnit.SECONDS.toMillis(30));
        stop.set(true);
        executorService.shutdown();
        executorService.awaitTermination(30, TimeUnit.SECONDS);
    }
}
class Worker implements Runnable {
    private final LinkedBlockingDeque<Object> linkedBlockingDeque;
    private final Lock fillLock;
    private final AtomicBoolean stop;
    Worker(LinkedBlockingDeque<Object> linkedBlockingDeque, Lock fillLock, AtomicBoolean stop) {
        this.linkedBlockingDeque = linkedBlockingDeque;
        this.fillLock = fillLock;
        this.stop = stop;
    }
    @Override
    public void run() {
        try {
            while (!stop.get()) {
                Object o = linkedBlockingDeque.poll(1, TimeUnit.SECONDS);
                if (o != null) {
                    handle(o);
                }
                if (linkedBlockingDeque.size() < 10) {
                    tryFill();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    protected void tryFill() {
        if (fillLock.tryLock()) {
            try {
                System.out.println("Filling");
                for (int i = 0; i < 100; i++) {
                    linkedBlockingDeque.add(new Object());
                }
            } finally {
                fillLock.unlock();
            }
        }
    }
    protected void handle(Object object) {
        System.out.printf("object: %sn", object);
        //TODO: blah blah blah stuff
    }
}

如果你想让其他线程在队列填充时阻塞,只要让它们在队列未被填满时按顺序等待锁。

。将tryFill()更改为如下所示:

protected void () {

if (fillLock.tryLock()) {
    try {
        System.out.println("Filling");
        for (int i = 0; i < 100; i++) {
            linkedBlockingDeque.add(new Object());
        }
    } finally {
        fillLock.unlock();
    }
} else {
    fillLock.lock();
    try {
    } finally {
        fillLock.unlock();
    }
}

}

或者,您可以为此使用条件——我将把它留给op作为练习。

我认为可以使用另一个BlockingQueue来通知Producer是时候唤醒并填充池了。这样的:

package com.stackoverflow;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class Main {
    public static final int LOWER_BOUND = 42;
    public static final int UPPER_BOUND = 84;
    public static final int CONSUMERS_COUNT = 10;
    public static void main(String[] args) {
        BlockingQueue<Object> pool = new LinkedBlockingQueue<>();
        AtomicInteger currentPoolSize = new AtomicInteger(0);
        BlockingQueue<Object> commandsForProducer = new LinkedBlockingQueue<>();
        Thread producer = new Thread(new Producer(pool, currentPoolSize, commandsForProducer));
        producer.start();
        for (int i = 0; i < CONSUMERS_COUNT; i++) {
            Thread consumer = new Thread(new Consumer(pool, currentPoolSize, commandsForProducer));
            consumer.start();
        }
    }
}
class Producer implements Runnable {
    private BlockingQueue<Object> pool;
    private AtomicInteger currentPoolSize;
    private BlockingQueue<Object> commandsForProducer;
    Producer(BlockingQueue<Object> pool, AtomicInteger currentPoolSize, BlockingQueue<Object> commandsForProducer) {
        this.pool = pool;
        this.currentPoolSize = currentPoolSize;
        this.commandsForProducer = commandsForProducer;
    }
    @Override
    public void run() {
        while (true) {
            if (currentPoolSize.get() < Main.UPPER_BOUND){
                pool.add(new Object());
                System.out.println(Thread.currentThread().getName() + " producer, items in pool:" +
                        currentPoolSize.incrementAndGet());
                try {
                    Thread.sleep(4); // Simulating work
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                try {
                    System.out.println(Thread.currentThread().getName() + " producer is trying to sleep");
                    commandsForProducer.take();
                    System.out.println(Thread.currentThread().getName() + " producer awakes");
                    commandsForProducer.clear();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
class Consumer implements Runnable {
    private BlockingQueue<Object> pool;
    private AtomicInteger currentPoolSize;
    private BlockingQueue<Object> commandsForProducer;
    Consumer(BlockingQueue<Object> pool, AtomicInteger currentPoolSize, BlockingQueue<Object> commandsForProducer) {
        this.pool = pool;
        this.currentPoolSize = currentPoolSize;
        this.commandsForProducer = commandsForProducer;
    }
    @Override
    public void run() {
        while (true) {
            if (currentPoolSize.get() <= Main.LOWER_BOUND) {
                System.out.println(Thread.currentThread().getName() + " signaled to producer");
                commandsForProducer.add(new Object());
            }
            try {
                pool.take();
                System.out.println(Thread.currentThread().getName() + " consumer, items in pool:" +
                        currentPoolSize.decrementAndGet());
                Thread.sleep(50); // Simulating work
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

最新更新