生产者/消费者多线程



背景

由于没有钱上学,我在收费站上夜班,用互联网自学一些编码技能,希望明天能有一份更好的工作,或者能在网上销售我制作的一些应用程序。漫长的夜晚,很少有顾客。

我将多线程作为一个主题来处理,因为我在文献中遇到了很多使用它的代码(例如Android SDK),但我仍然觉得它很晦涩。

Spirit

在这一点上,我的方法是:试着编写我能想到的最基本的多线程示例,把头靠在墙上,看看我是否能扩展我的大脑来适应一些新颖的思维方式。我把自己暴露在极限之下,希望能超越极限。随意批评,甚至挑剔,并指出做我想做的事情的更好方法。

目标

  • Get some advice on how to do the above, based on my efforts so far (code provided)

练习

以下是我定义的范围:

定义

创建两个类,它们在数据对象的生成和使用方面协同工作。一个线程创建对象,并将它们传递到共享空间,供另一个线程拾取和使用。让我们将生成线程Producer、消耗线程Consumer和共享空间SharedSpace调用。生产供他人消费的物品的行为可以通过类比这种情况来同化:

`Producer`    (a busy mum making chocolate-covered cakes for his child, up to a limit)
`Consumer`    (a hungry child waiting to eat all cakes the mum makes, until told to stop)
`SharedSpace` (a kitchen table on which the cakes are put as soon as they become ready)
`dataValue`   (a chocolate-dripping cake which MUST be eaten immediately or else...)

为了简化练习,我决定而不是让妈妈在孩子吃蛋糕时做饭。她只会等孩子吃完蛋糕,然后立即再做一个蛋糕,在一定的限度内,以达到良好的育儿效果。练习的本质是练习线程信号,而不是实现任何并发。相反,我专注于完美的连载,没有民意调查或"我还能去吗?"检查。我想我接下来必须对母亲和孩子并行"工作"的后续练习进行编码。

方法

  • 让我的类实现Runnable接口,这样它们就有了自己的代码入口点

  • 使用我的类作为Thread对象的构造函数参数,这些对象是从程序的main入口点实例化和启动的

  • 通过Thread.join()确保main程序不会在线程执行之前终止

  • 设置ProducerConsumer创建数据的次数限制

  • Produce将用于发出数据生产结束信号的哨兵值达成一致

  • 记录共享资源锁的获取和数据生产/消耗事件,包括工作线程的最终签署

  • 从程序的main创建一个SharedSpace对象,并在启动之前将其传递给每个工作者

  • 将对SharedSpace对象的private引用存储在每个工作内部

  • 提供防范措施和消息,以描述在生成任何数据之前准备使用Consumer的情况

  • 在给定次数的迭代后停止Producer

  • 读取哨兵值后,停止Consumer

代码


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Consumer extends Threaded {
public Consumer(SharedSpace sharedSpace) {
super(sharedSpace);
}
@Override
public void run() {
super.run();
int consumedData = 0;
while (consumedData != -1) {
synchronized (sharedSpace) {
logger.info("Acquired lock on sharedSpace.");
consumedData = sharedSpace.dataValue;
if (consumedData == 0) {
try {
logger.info("Data production has not started yet. "
+ "Releasing lock on sharedSpace, "
+ "until notification that it has begun.");
sharedSpace.wait();
} catch (InterruptedException interruptedException) {
logger.error(interruptedException.getStackTrace().toString());
}
} else if (consumedData == -1) {
logger.info("Consumed: END (end of data production token).");
} else {
logger.info("Consumed: {}.", consumedData);
logger.info("Waking up producer to continue data production.");
sharedSpace.notify();
try {
logger.info("Releasing lock on sharedSpace "
+ "until notified of new data availability.");
sharedSpace.wait();
} catch (InterruptedException interruptedException) {
logger.error(interruptedException.getStackTrace().toString());
}
}
}
}
logger.info("Signing off.");
}
}
class Producer extends Threaded {
private static final int N_ITERATIONS = 10;
public Producer(SharedSpace sharedSpace) {
super(sharedSpace);
}
@Override
public void run() {
super.run();
int nIterations = 0;
while (nIterations <= N_ITERATIONS) {
synchronized (sharedSpace) {
logger.info("Acquired lock on sharedSpace.");
nIterations++;
if (nIterations <= N_ITERATIONS) {
sharedSpace.dataValue = nIterations;
logger.info("Produced: {}", nIterations);
} else {
sharedSpace.dataValue = -1;
logger.info("Produced: END (end of data production token).");
}
logger.info("Waking up consumer for data consumption.");
sharedSpace.notify();
if (nIterations <= N_ITERATIONS) {
try {
logger.info("Releasing lock on sharedSpace until notified.");
sharedSpace.wait();
} catch (InterruptedException interruptedException) {
logger.error(interruptedException.getStackTrace().toString());
}
}
}
}
logger.info("Signing off.");
}
}
class SharedSpace {
volatile int dataValue = 0;
}
abstract class Threaded implements Runnable {
protected Logger logger;
protected SharedSpace sharedSpace;
public Threaded(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
logger = LoggerFactory.getLogger(this.getClass());
}
@Override
public void run() {
logger.info("Started.");
String workerName = getClass().getName();
Thread.currentThread().setName(workerName);
}
}
public class ProducerConsumer {
public static void main(String[] args) {
SharedSpace sharedSpace = new SharedSpace();
Thread producer = new Thread(new Producer(sharedSpace), "Producer");
Thread consumer = new Thread(new Consumer(sharedSpace), "Consumer");
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}

执行日志


Consumer - Started.
Consumer - Acquired lock on sharedSpace.
Consumer - Data production has not started yet. Releasing lock on sharedSpace, until notification that it has begun.
Producer - Started.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 1
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 1.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 2
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 2.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 3
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 3.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 4
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 4.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 5
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 5.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 6
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 6.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 7
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 7.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 8
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 8.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 9
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 9.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 10
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 10.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: END (end of data production token).
Producer - Waking up consumer for data consumption.
Producer - Signing off.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: END (end of data production token).
Consumer - Signing off.

问题

  • 以上内容正确吗?(例如,它是否使用了正确的语言工具、正确的方法、是否包含任何愚蠢的代码等)

但它"看起来不错">

即使输出"看起来不错",我也会询问正确性,因为你无法想象我的测试"一次"而不是"另一次"出现了多少次错误(例如,当消费者第一次启动时,当生产者在生产哨兵后从未退出时等)。我学会了不要从"成功运行"中宣称正确性。相反,我对伪并行代码变得非常怀疑!(根据定义,这个甚至不是平行的!0

扩展答案

一个好的问题只关注one requested piece of advice(上面的那个),但如果你喜欢的话,可以在回答中提及对以下其他主题的任何见解:

  • 在编写下一次尝试的代码时,如何测试并行代码?

  • 哪些工具可以帮助我进行开发和调试?考虑我使用Eclipse

  • 如果我允许Producer继续生产,每次生产需要一些可变的时间,而Consumer消耗任何可用的东西,这种方法会改变吗?锁必须移到其他地方吗?信号传递是否需要改变这种等待/通知模式?

  • 这种做事的方法已经过时了吗?我应该学习其他东西吗?从这个收费站,我不知道"在Java的真实世界中"会发生什么

下一步

  • 接下来我该去哪里?我在某个地方看到过"未来">的概念,但我可以使用编号的主题列表按顺序、学究式排序,并链接到相关的学习资源

Tino Sino

以上内容正确吗?

我看到的唯一问题是@Tudor和@Bhaskar提到了什么。当您在等待条件时测试条件时,必须使用while循环。然而,这更多的是关于多个生产者和消费者的种族条件。虚假的清醒可能会发生,但比赛条件更有可能发生。请参阅我关于这个主题的页面。

是的,您只有一个生产者和一个消费者,但您可以尝试将代码扩展到多个消费者,或者将代码复制到另一个场景。

我学会了不要从"成功运行"中宣称正确性。相反,我对伪并行代码变得非常怀疑!

本能好。

如何在编写下一次尝试的代码时测试并行代码?

这很难。扩大规模是一种方法。添加多个生产商和消费者,看看是否存在问题。在具有不同数量/类型处理器的多个体系结构上运行。你最好的防御措施是代码的正确性。紧密同步,很好地使用BlockingQueueExecutorService等类,使您的关闭更简单/更干净。

没有简单的答案。测试多线程代码非常困难。

哪些工具可以帮助我进行开发和调试?

在一般情况下,我会研究像Emma这样的覆盖工具,这样你就可以确保你的单元测试覆盖了你的所有代码。

在多线程代码测试方面,了解如何读取kill -QUIT线程转储,并查看Jconsole内部的运行线程。像YourKit这样的Java评测程序也可能有所帮助。

如果我允许制片人继续制作,每次制作都需要一些可变的时间,这种方法会改变吗。。。

我不这么认为。消费者将永远等待生产者。也许我不明白这个问题?

这种做事的方法过时了吗?我应该学习其他东西吗?从这个收费站,我不知道"在Java的真实世界中"会发生什么

接下来是关于ExecutorService类的学习。它们处理很大一部分new Thread()风格的代码,尤其是当您处理大量使用线程执行的异步任务时。这是一个教程。

我应该从这里去哪里?

再次,ExecutorService。我想你已经读过这些入门文档了。正如@Bhaskar提到的,Java并发在实践中是一本很好的圣经。


以下是关于您的代码的一些一般性评论:

  • SharedSpaceThreaded类似乎是一种人为的方法。如果你在玩基类之类的东西,那没关系。但总的来说,我从来没有使用过这样的模式。生产者和消费者通常使用类似LinkedBlockingQueueBlockingQueue,在这种情况下,同步和volatile有效载荷由您负责。此外,我倾向于将共享信息注入对象构造函数,而不是从基类中获取。

  • 通常,如果我使用synchronized,它位于private final字段上。通常我会创建一个用于锁定的private final Object lockObject = new Object();,除非我已经在处理一个对象。

  • 注意巨大的synchronized块和将日志消息放入synchronized节中。日志通常对文件系统执行synchronizedIO,这可能非常昂贵。如果可能的话,你应该有小的、非常紧密的synchronized块。

  • 您可以在循环之外定义consumedData。我会在赋值时定义它,然后如果它是== -1,则使用break从循环中退出。如果可能的话,一定要限制局部变量的范围。

  • 您的日志记录消息将主导您的代码性能。这意味着,当您删除它们时,您的代码将以完全不同的方式执行。当你去调试它的问题时,这是非常重要的。当你转移到具有不同CPU/内核的不同架构时,性能也会(很可能)发生变化。

  • 您可能知道这一点,但当您调用sharedSpace.notify();时,这只意味着如果另一个线程当前位于sharedSpace.wait();中,则会通知。如果它不是其他东西,那么它将错过通知。仅供参考。

  • 做一个if (nIterations <= N_ITERATIONS),然后在else下面的3行再做一次,这有点奇怪。复制notify()会更好地简化分支。

  • 你有int nIterations = 0;,然后是while,然后在++中。这是一个for循环的配方:

    for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
    

这里有一个更严格的代码版本。这只是我写它的一个例子。再说一遍,除了缺少while之外,你的版本似乎没有什么问题。

public class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
int consumedData = queue.take();
if (consumedData ==  Producer.FINAL_VALUE) {
logger.info("Consumed: END (end of data production token).");
break;
}
logger.info("Consumed: {}.", consumedData);
}
logger.info("Signing off.");
}
}
public class Producer implements Runnable {
public static final int FINAL_VALUE = -1;
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
logger.info("Produced: {}", nIterations);
queue.put(nIterations);
}
queue.put(FINAL_VALUE);
logger.info("Produced: END (end of data production token).");
logger.info("Signing off.");
}
}
public class ProducerConsumer {
public static void main(String[] args) {
// you can add an int argument to the LinkedBlockingQueue constructor
// to only allow a certain number of items in the queue at one time
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
Thread producer = new Thread(new Producer(queue), "Producer");
Thread consumer = new Thread(new Consumer(queue), "Consumer");
// start and join go here
}
}

你在这里似乎做得很好。其实没什么可挑剔的。我想建议的一个想法是,应该避免在缓冲区对象本身上进行同步。在这种情况下,这是可以的,但假设您切换到数据结构缓冲区,根据类的不同,它可能会在内部同步(例如Vector,尽管它现在已经过时了),所以从外部获取锁可能会把它搞砸。

编辑:Bhaskar非常重视使用while包装对wait的调用。这是因为可能会出现臭名昭著的虚假唤醒,迫使线程过早地退出wait,因此您需要确保它返回。

接下来你可以做的是实现一个有限缓冲区生产者-消费者:拥有一些共享的数据结构,例如链表,并设置最大大小(例如10个项目)。然后让生产商继续生产,只在队列中有10个项目时暂停生产。每当缓冲区为空时,使用者将被挂起。

接下来您可以采取的步骤是学习如何将手动执行的流程自动化。看看BlockingQueue,它提供了一个具有阻塞行为的缓冲区(即,如果缓冲区为空,消费者将自动阻塞,而如果缓冲区已满,生产者将阻塞)。

此外,根据具体情况,执行器(参见ExecutorService)可能是一个有价值的替代品,因为它们封装了一个任务队列和一个或多个工作者(消费者),所以您只需要生产者。

Producers和Consumers可以是实现Runnable(无extends Threaded)的简单类,这样它们就不那么脆弱了。客户端可以创建Thread的主题并附加实例,因此不需要类层次结构的开销。

您在wait()之前的状况应该是while(),而不是if

编辑:来自JCIP第301页:

void stateDependentMethod() throws InterruptedException {
// condition predicate must be guarded by lock
synchronized(lock) {
while (!conditionPredicate())
lock.wait();
// object is now in desired state
}
}

您已经内置了静态停止的条件。通常情况下,生产商和消费者应该更加灵活——他们应该能够对外部停止的信号做出反应。

对于初学者来说,要实现外部停止信号,您需要一个标志:

class Producer implements Runnable { 
private volatile boolean stopRequested ;
public void run() {
while(true){
if(stopRequested )
// get out of the loop
}
}
public void stop(){
stopRequested  = true;
// arrange to  interrupt the Producer thread here.
}
}

当您尝试实现上述内容时,您可能会发现还会出现其他复杂情况——例如,您的生产者首先发布,然后是wait()ing,但这可能会导致问题。

如果您有兴趣进一步阅读,我建议您阅读《Java并发实践》一书。这将有很多建议,我可以在这里补充。

雄心无限!你差不多8年前问过这个问题。我希望你的努力为你(并将继续为你)提供你想要的教育。

目前,强烈建议使用wait()notify()join()来实现Java中的多线程。当你试图将并发控制在这么低的级别时,你很容易自食其果(事实上,Java设计者承认Thread的许多方法和语义实际上是设计错误,但出于向后兼容性的目的,他们不得不将它们保留下来——许多方法和方法都会使用新的"虚拟线程"(Project Loom)——但这是另一个话题)。

现在手动启动和控制线程的首选方式是通过ExecutorService.submit(Callable<V>),返回Future<V>。然后,您可以通过调用Future<V>.get()、返回可调用程序返回的V类型的值(或者如果Callable抛出未捕获的异常则抛出ExecutionException)来等待线程退出(并获取返回值)。

下面的类是一个如何实现类似内容的示例。这将通过单个有界阻塞队列将任意数量的生产者连接到任意数量的消费者。(忽略线程的返回值,因此调用ExecutorService.submit(Runnable),返回Future<?>,而不是ExecutorService.submit(Callable<V>))。

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public abstract class ProducerConsumer<E> {
private final BlockingQueue<Optional<E>> queue;
public ProducerConsumer(
int numProducerThreads, int numConsumerThreads, int queueCapacity) {
if (numProducerThreads < 1 || numConsumerThreads < 1 || queueCapacity < 1) {
throw new IllegalArgumentException();
}
queue = new ArrayBlockingQueue<Optional<E>>(queueCapacity);
final ExecutorService executor = 
Executors.newFixedThreadPool(numProducerThreads + numConsumerThreads);
try {
// Start producer threads
final List<Future<?>> producerFutures = new ArrayList<>();
final AtomicInteger numLiveProducers = new AtomicInteger();
for (int i = 0; i < numProducerThreads; i++) {
producerFutures.add(executor.submit(() -> {
numLiveProducers.incrementAndGet();
// Run producer
producer();
// When last producer finishes, deliver poison pills to consumers
if (numLiveProducers.decrementAndGet() == 0) {
for (int j = 0; j < numConsumerThreads; j++) {
queue.put(Optional.empty());
}
}
return null;
}));
}
// Start consumer threads
final List<Future<?>> consumerFutures = new ArrayList<>();
for (int i = 0; i < numConsumerThreads; i++) {
consumerFutures.add(executor.submit(() -> {
// Run Consumer
consumer();
return null;
}));
}
// Wait for all producers to complete
completionBarrier(producerFutures, false);
// Shut down any consumers that are still running after producers complete
completionBarrier(consumerFutures, false);
} finally {
executor.shutdownNow();
}
}
private static void completionBarrier(List<Future<?>> futures, boolean cancel) {
for (Future<?> future : futures) {
try {
if (cancel) {
future.cancel(true);
}
future.get();
} catch (CancellationException | InterruptedException e) {
// Ignore
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
protected void produce(E val) {
try {
queue.put(Optional.of(val));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
protected Optional<E> consume() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/** Producer loop. Call {@link #produce(E)} for each element. */
public abstract void producer();
/**
* Consumer thread. Call {@link #consume()} to get each successive element,
* until an empty {@link Optional} is returned.
*/
public abstract void consumer();
}

使用如下:

new ProducerConsumer<Integer>(/* numProducerThreads = */ 1, /* numConsumerThreads = */ 4,
/* queueCapacity = */ 10) {
@Override
public void producer() {
for (int i = 0; i < 100; i++) {
System.out.println("Producing " + i);
produce(i);
}
}
@Override
public void consumer() {
for (Optional<Integer> opt; (opt = consume()).isPresent; ) {
int i = opt.get();
System.out.println("Got " + i);
}
}
};

最新更新