Java 生产者-消费者多线程基准测试.为什么它会停止



请帮我解决问题。

我有java程序 - 测试服务器,类似于echo,一个基准工具。

简化:

我从不同数量的线程(模拟真实世界的生产者,例如,来自 10 个客户端或 1 个客户端,实际上无关紧要)向服务器发送 100 条消息并接收响应。

我对消费者也有同样的情况(消费者=线程)。生产者和使用者生成事件(从...消息发送自...等)

我的主要实现MyEvenListener,使用onMessage()并计算所有内容。

我的问题是我无法接收所有 100 条消息并对其进行计数,因为程序在发送消息后停止。我知道这很简单,但不知道如何解决它。:(

这是我的代码:

public static void main(String[] args) throws InterruptedException {
    Main m = new Main();
    m.init();
}
private int mesReceved=0;
public void init() throws InterruptedException {
    Sender s = new Sender(15,this);
    Resender r = new Resender(15,this);
    r.createThreads();
    r.startThreads();
    s.createThreads();
    s.startThreads();
    System.out.println(mesReceved);
}
public void onEvent(String string) {
    mesReceved++;
}

我认为这段代码没有问题。

您可以尝试简化问题吗,例如减少线程,直到问题停止发生。

在最简单的情况下,例如 4 个线程,发生这种情况的地方

  • 取一个线程堆栈,
  • 使用调试或
  • 添加日志记录以诊断问题。

你实际上是在等待线程完成后才离开main吗?您应该在main结束时对所有创建的线程使用 Thread.join,否则主线程将退出而不等待子线程完成。

你是如何启动你的程序的?它可能成功完成,然后关闭窗口。有几种方法可以解决这个问题。

我想到的一个想法是,你可以在main中尝试一个Thread.sleep(time)。(其中时间是您希望它等待多长时间(以毫秒为单位)。

您的问题在于主线程在设置过程后终止,因此终止所有其他线程。

等待线程完成其工作的一种优雅方法是使用高级同步辅助工具,如 CountDownLatch。

在这种情况下,您可以像这样重写 init 代码:

public void init() throws InterruptedException {
    ...
    producer = r.createThreads();
    r.startThreads();
    consumer = s.createThreads();
    s.startThreads();
    ...
    producer.getCountDownLatch().await();      // wait for all producers to finish
    consumer.getCountDownLatch().await();      // wait for all consumers to finish 
    // note: you could also encapsulate the latch and internally delegate the await
    // producer.await(); // nicer to read
}

在发送方和接收方类上,创建并维护 CountDownLatch:

class Sender {
    final CountDownLatch sync;
    public Sender (int threadCount) {
        sync = new CountDownLatch(threadCount);
        ...
    }
    public CountDownLatch getCountDownLatch() {
        return sync;
    }
    // alternative
    public boolean await() {
        return sync.await();
    }
} 

创建线程时,将 countDownLatch 传递给每个可运行的线程。当它们完成工作时,您递减闩锁:

class MyRunnable implements Runnable {
    private final CountDownLatch latch;
    public MyRunnable(CountDownLatch latch) {
        this.latch = latch;
        ...
    } 
    public void run() {
        // DO STUFF
        latch.countDown();
    }
}

有了此同步过程,程序才会在所有生成者和使用者完成其工作时终止。此外。await方法可以将超时作为参数,以便可以确保程序在某些边界内终止。例如,让所有生产者工作,但只等待消费者 5 分钟:

public void init() {
    ...
    producer.getCountDownLatch().await();      // wait for all producers to finish
    boolean allFinished = consumer.getCountDownLatch().await(5, TimeUnit.MINUTES);      // wait 5 minutes for all consumers to finish.
    if (!allFinished) {
        // alert that not all consumers ended on time
    }
    ...
}

相关内容

  • 没有找到相关文章

最新更新