我有一个javax.jms.Queue队列,并让侦听器侦听该队列。我得到消息(字符串)并执行一个进程,将字符串作为输入参数传递给该进程。
我只想同时运行该进程的10个实例。一旦这些消息完成,则只应处理下一个消息。
如何实现?因为它一次读取所有消息并运行该进程的尽可能多的实例,导致服务器被挂起。
// using javax.jms.MessageListener
message = consumer.receive(5000);
if (message != null) {
try {
handler.onMessage(message); //handler is MessageListener instance
}
}
尝试将此注释放在mdb侦听器上:
@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10")
我假设您有一种接受来自外部进程的hasTerminated
消息的方法。该控制器线程将使用信号量与JMS侦听器通信。Semaphore
初始化为10个许可,每次外部进程调用TerminationController#terminate
(或者外部进程与侦听进程通信)时,它都会向Semaphore
添加一个许可,然后JMSListener
必须首先获得一个许可才能调用messageConsumer.release()
,这确保一次不能超过10个进程处于活动状态。
// created in parent class
private final Semaphore semaphore = new Semaphore(10);
@Controller
public class TerminationController {
private final semaphore;
public TerminationController(Semaphore semaphore) {
this.semaphore = semaphore;
}
// Called from external processes when they terminate
public void terminate() {
semaphore.release();
}
}
public class JMSListener implements Runnable {
private final MessageConsumer messageConsumer;
private final Semaphore semaphore;
public JMSListener(MessageConsumer messageConsumer, Semaphore semaphore) {
this.messageConsumer = messageConsumer;
this.semaphore = semaphore;
}
public void run() {
while(true) {
semaphore.acquire();
Message message = messageConsumer.receive();
// create process from message
}
}
}
我认为一个简单的while
检查就足够了。下面是一些伪代码。
While (running processes are less than 10) {
add one to the running processes list
do something with the message
}
和onMessage
代码中的
function declaration of on Message(Parameters) {
do something
subtract 1 from the running processes list
}
确保用于计算运行进程数量的变量声明为volatile
。
请求的示例:
public static volatile int numOfProcesses = 0;
while (true) {
if (numOfProcesses < 10) {
// read a message and make a new process, etc
// probably put your receive code here
numOfProcesses++;
}
}
无论你的进程代码写在哪里:
// do stuff, do stuff, do more stuff
// finished stuff
numOfProcesses--;