信号量实现的面向生产者-消费者的线程池



我目前正在做一项教育任务,其中我必须实现一个仅信号灯线程安全线程池。

我不能在作业期间使用:Synchronize wait notify sleep或任何线程安全的 API。

首先,不要过多地了解我拥有的代码:

  • 实现了线程安全队列(没有两个线程可以同时排队\取消排队((我已经用ConcurrentLinkedQueue测试了这个问题,问题仍然存在(

设计本身:

共享:

  • Tasks信号量 = 0

  • Available信号量 = 0

  • Tasks_Queue队列

  • Available_Queue队列

工作线程:

  • Blocked信号量 = 0

基本信息:

  • 只有管理器(单线程(可以取消排队Tasks_QueueAvailable_Queue

  • 只有应用程序主(单线程(可以排队任务是Tasks_Queue

  • 每个工作线程都可以在Available_Queue中将自己排队

因此,我们混合了一个生产者,一个经理和几个消费者。

  • 当应用程序首次启动时,每个工作线程都会启动并立即在Available_Queue中排队,释放Available信号量并被阻止获取其个人Blocked信号量。
  • 每当 App-Main 将新任务排队时,它都会Task信号量中释放
  • 每当管理器希望执行新任务时,它必须首先获取TasksAvailable信号量。

我的问题:

在应用的运行时,函数dequeue_worker()返回 null 工作线程,即使放置了信号量以保护对队列的访问,当已知没有可用的工作线程时也是如此。

我已经通过递归调用dequeue_worker()如果它绘制空线程来"解决"问题,但这样做假设会使获得信号量许可证永远丢失。 然而,当我将工人数量限制为 1 时,工人不会永远被阻止。

1(我的原始设计的断点是什么?

2(为什么我的"解决方案"没有进一步破坏设计?!

代码片段:

// only gets called by Worker threads: enqueue_worker(this);
    private void enqueue_worker(Worker worker) {
       available_queue.add(worker);
       available.release();
    }
// only gets called by App-Main (a single thread)
    public void enqueue_task(Query query) {
        tasks_queue.add(query);
        tasks.release();
    }
// only gets called by Manager(a single Thread) 
    private Worker dequeue_worker() {
        Worker worker = null;
        try {
            available.acquire();
            worker = available_queue.poll();
        } catch (InterruptedException e) {
            // shouldn't happen
        } // **** the solution: ****
        if (worker==null) worker = dequeue_worker(); // TODO: find out why
        return worker;
    }
// only gets called by Manager(a single Thread) 
    private Query dequeue_task() {
        Query query = null;
        try {
            tasks.acquire();
            query = tasks_queue.poll();
        } catch (InterruptedException e) {
            // shouldn't happen
        } 
        return query;
    }
// gets called by Manager (a single thread)
    private void execute() { // check if task is available and executes it
        Worker worker = dequeue_worker(); // available.down()
        Query query = dequeue_task(); //task.down()
        worker.setData(query);
        worker.blocked.release();
    }

最后是工人Run()方法:

while (true) { // main infinite loop
                enqueue_worker(this);
                acquire(); // blocked.acquire();
                <C.S>
                available.release();
            }

您正在调用available.release()两次,一次在enqueue_worker,第二次在主循环中。

最新更新