如何正确地让多个线程访问共享缓冲区



我有一个生产者/消费者的情况,生产者为消费者生成要访问的域。消费者发送一个https请求,从页面中获取链接并将其提交给生产者。当生产者完成时,消费者没有,并挂起最终域。我一辈子都搞不清楚为什么会发生这种事。

我已经简化了我的问题

Main:

public class Main {
public static void main(String[] args) throws InterruptedException {
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);

threadPool.execute(new Consumer(broker));
threadPool.execute(new Consumer(broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();

threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}

经纪人:

public class Broker {
private BlockingQueue<String> QUEUE = new LinkedBlockingQueue<String>();
public Boolean continueProducing = Boolean.TRUE;
public void put(String data) throws InterruptedException
{
this.QUEUE.put(data);
}
public String get() throws InterruptedException
{
//return this.queue.poll(1, TimeUnit.SECONDS);
return this.QUEUE.take();
}
}

消费者:

public class Consumer implements Runnable{
private Broker broker;

public Consumer(Broker broker) {
this.broker = broker;
}
@Override
public void run() {

try {
String data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + Thread.currentThread().getName() + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + Thread.currentThread().getName() + " finished its job; terminating.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
Thread.currentThread().interrupt();
e.printStackTrace();
}

}

}

生产商:

public class Producer implements Runnable{
private Broker broker;

public Producer(Broker broker) {
this.broker = broker;
}
@Override
public void run() {
try
{
for (int i = 0; i < 2; ++i) {
System.out.println("Producer produced: " + "https://example" + i + ".com");
Thread.sleep(100);
broker.put("https://example" + i + ".com");
}
//broker.put("https://example.com/2");
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}catch(Exception e) 
{
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

更新答案:

当我运行您的代码时,使用者会卡在data = broker.get()行。broker正在调用BlockingQueue.take方法。这是这个方法的Javadoc(强调我的(:

检索并删除此队列的头,在必要时等待,直到元素可用

这意味着即使生产者没有生产任何东西,消费者仍然会等待生产。

对你来说,一个可能的解决方案是使用"毒丸"方法。假设您只有一个生产者,那么您的Broker类可能如下所示:

public class Broker {
private static final String POISON_PILL = "__POISON_PILL__";
private BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public void put(String data) {
queue.add(data);
}
public void doneProducing() {
queue.add(POISON_PILL);
}
public String get() throws InterruptedException {
String result = queue.take();
if (result.equals(POISON_PILL)) {
queue.add(POISON_PILL);
return null;
} else {
return result;
}
}
}

以前代码的答案:

如果你能缩小这个问题的范围,使它只包含最少量的代码来获得死锁,那就太好了。目前,你发布的很多代码都不相关,还有一些相关的代码你没有发布。

此外,您当前的代码还有很多问题。您的toLinkedHashSet方法未编译。在您的add方法中,您正在调用BlockingQueue.put方法,即使您的BlockingQueue永远不会达到它的极限。您声称想要为contains提供?(1(时间,但您的代码有?(n(时间。您似乎还在addAllcontains方法中进行了大量不必要的复制。

这里没有足够的信息让我知道问题是什么,但有一件事可能会导致你的问题,那就是你的get方法。如果使用者线程被中断,那么get方法将导致它不中断自身(这可能不会导致死锁,但看起来可能类似于死锁(。在Java中,忽略异常是很难接受的。如果对take方法的调用抛出InterruptedException,那是有原因的:另一个线程希望当前线程停止。您的get方法应该抛出InterruptedException。例如:

public String get() throws InterruptedException {
return unprocessed.take();
}

如果您真的需要get方法来不抛出InterruptedException,那么您可以抛出其他包含InterruptedException的链式异常。如果在中断时返回""真的很合适,你可以这样做:

public String get() {
try {
return unprocessed.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "";
}
}

通过中断当前线程,您可以确保至少当前线程被标记为已中断,这样后续的事情就可以处理它。但如果可能的话,抛出InterruptedException可能是最合适的。

我仍然不明白为什么要为LinkedBlockingQueue创建自己的包装器,而不是单独使用LinkedBlockingQueue。似乎你在LinkedBlockingQueue上添加的所有内容都只是在放慢速度。

最新更新