监听jms队列,一次只处理10条消息



我有一个javax.jms.Queue队列,并让侦听器侦听该队列。我得到消息(字符串)并执行一个进程,将字符串作为输入参数传递给该进程。

我只想同时运行该进程的10个实例。一旦这些消息完成,则只应处理下一个消息。

如何实现?因为它一次读取所有消息并运行该进程的尽可能多的实例,导致服务器被挂起。

// using javax.jms.MessageListener
message = consumer.receive(5000); 
if (message != null) { 
    try { 
        handler.onMessage(message);  //handler is MessageListener instance
    }
}

尝试将此注释放在mdb侦听器上:

@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10")

我假设您有一种接受来自外部进程的hasTerminated消息的方法。该控制器线程将使用信号量与JMS侦听器通信。Semaphore初始化为10个许可,每次外部进程调用TerminationController#terminate(或者外部进程与侦听进程通信)时,它都会向Semaphore添加一个许可,然后JMSListener必须首先获得一个许可才能调用messageConsumer.release(),这确保一次不能超过10个进程处于活动状态。

// created in parent class
private final Semaphore semaphore = new Semaphore(10);
@Controller
public class TerminationController {
    private final semaphore;
    public TerminationController(Semaphore semaphore) {
        this.semaphore = semaphore;
    }
    // Called from external processes when they terminate
    public void terminate() {
        semaphore.release();
    }
}
public class JMSListener implements Runnable {
    private final MessageConsumer messageConsumer;
    private final Semaphore semaphore;
    public JMSListener(MessageConsumer messageConsumer, Semaphore semaphore) {
        this.messageConsumer = messageConsumer;
        this.semaphore = semaphore;
    }
    public void run() {
        while(true) {
            semaphore.acquire();
            Message message = messageConsumer.receive();
            // create process from message
        }
    }
}

我认为一个简单的while检查就足够了。下面是一些伪代码。

While (running processes are less than 10) {
    add one to the running processes list
    do something with the message
}

onMessage代码中的

function declaration of on Message(Parameters) {
    do something
    subtract 1 from the running processes list
}

确保用于计算运行进程数量的变量声明为volatile

请求的示例:

public static volatile int numOfProcesses = 0;
while (true) {
    if (numOfProcesses < 10) {
        // read a message and make a new process, etc
        // probably put your receive code here
        numOfProcesses++;
    }
}

无论你的进程代码写在哪里:

// do stuff, do stuff, do more stuff
// finished stuff
numOfProcesses--;

最新更新