等待和通知发送一堆消息



如果有人能解释我如何等待/通知/notifyAll工作,如果有更好的解决方案,我所面临的问题,我会非常感激。基本上,我们得发一堆短信。为了发送消息,使用了一个名为SMPPSession的对象,但在本例中,我只使用简单的代码。SMPPSession应该向SMSC服务器发送消息,并在连接中断时重新建立会话。我想使用多个线程来发送多个消息,并有一个单独的单个线程,某种"监护人"/"监视者"/"通知者"。这个单独的线程的作用是在它重建会话时阻止所有其他线程执行它们的代码。当然,SMPPSession在所有这些线程之间共享。一旦守护线程完成重新连接,所有其他线程需要继续使用会话并继续发送。

现在,我有一些代码和得到异常。任何帮助吗?

在现实中,我们确实使用jsmpp库发送真实的SMS消息,其中有SMPPSession对象。

public class SMPPSession {

private boolean bind;
private static final Random idGenerator = new Random();

public int sendMessage(String msg){
try{
Thread.sleep(1000L);
System.out.println("Sending message: " + msg);
return Math.abs(idGenerator.nextInt());
} catch (InterruptedException e){
e.printStackTrace();
}
return -1;
}

public void reBind(){
try{
System.out.println("Rebinding...");
Thread.sleep(1000L);
this.bind = true;
System.out.println("Session established!");
} catch (InterruptedException e){
e.printStackTrace();
}
}

public boolean isBind(){
return this.bind;
}
}

public class Sender extends Thread{

private SMPPSession smppSession;

public Sender(String name, SMPPSession smppSession){
this.setName(name);
this.smppSession = smppSession;
}

@Override
public void run(){

while (!Client.messages.isEmpty()){

synchronized (Client.messages){

if (smppSession.isBind()){
final String msg = Client.messages.remove(0);
final int msgId = smppSession.sendMessage(msg);
System.out.println(Thread.currentThread().getName() + " sent msg and received msgId: " + msgId);
Client.messages.notifyAll();
} else {
try {
Client.messages.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

}

}
}

public class SessionProducer extends Thread{

private SMPPSession smppSession;

public SessionProducer(String name, SMPPSession smppSession){
this.setName(name);
this.smppSession = smppSession;
}

@Override
public void run(){

while (!Client.messages.isEmpty()){

synchronized (Client.messages){
if (!smppSession.isBind()){
smppSession.reBind();
System.out.println(Thread.currentThread().getName() + " managed to reestablish SMPP session.");
Client.messages.notifyAll();
} else{
try {
Client.messages.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

}

}
}

public class Client {

public static final List<String> messages = new CopyOnWriteArrayList<>();

public static void main(String[] args) {

//populate messages from db
messages.add("msg1"); messages.add("msg2"); messages.add("msg3");  messages.add("msg4"); messages.add("msg5"); messages.add("msg6");

SMPPSession smppSession = new SMPPSession();
SessionProducer sessionProducer = new SessionProducer("SessionProducer1", smppSession);
Sender sender1 = new Sender("Sender1", smppSession);
Sender sender2 = new Sender("Sender2", smppSession);
Sender sender3 = new Sender("Sender3", smppSession);
Sender sender4 = new Sender("Sender4", smppSession);

sessionProducer.start();
sender1.start();
sender2.start();
sender3.start();
sender4.start();

}
}

Naturally, I get exception and have no idea why. Somehow threads are not in sync.


Rebinding...
Session established!
SessionProducer1 managed to reestablish SMPP session.
Sending message: msg1
Sender4 sent msg and received msgId: 432995458
Sending message: msg2
Sender4 sent msg and received msgId: 113629699
Sending message: msg3
Sender4 sent msg and received msgId: 611735717
Sending message: msg4
Sender4 sent msg and received msgId: 1234995659
Sending message: msg5
Sender4 sent msg and received msgId: 922228968
Sending message: msg6
Sender4 sent msg and received msgId: 2097204472
Exception in thread "Sender2" Exception in thread "Sender1" Exception in thread "Sender3" java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0
at java.base/java.util.concurrent.CopyOnWriteArrayList.elementAt(CopyOnWriteArrayList.java:385)
at java.base/java.util.concurrent.CopyOnWriteArrayList.remove(CopyOnWriteArrayList.java:478)
at demo.Sender.run(Sender.java:20)
java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0
at java.base/java.util.concurrent.CopyOnWriteArrayList.elementAt(CopyOnWriteArrayList.java:385)
at java.base/java.util.concurrent.CopyOnWriteArrayList.remove(CopyOnWriteArrayList.java:478)
at demo.Sender.run(Sender.java:20)
java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0
at java.base/java.util.concurrent.CopyOnWriteArrayList.elementAt(CopyOnWriteArrayList.java:385)
at java.base/java.util.concurrent.CopyOnWriteArrayList.remove(CopyOnWriteArrayList.java:478)
at demo.Sender.run(Sender.java:20)

你的循环调用没有同步的Client.messages.isEmpty()。我还没有花时间真正地理解你的代码是做什么的——不管怎么说我也看不清——但是我可以猜测发生了什么。

也许列表中包含一条信息。

  • 四个线程都看到它不是空的
  • 四个线程都试图进入synchronized(Client.messages)块。
  • 一个接一个,他们进入块,看到smppSession.isBind()是真的,并试图从列表中删除一条消息。
  • 第一个删除消息的线程成功,然后其他四个线程抛出异常,因为它试图从空列表中删除。

推荐中国的SMS开发库SMS -client支持Smpp。

<dependency>
<groupId>com.chinamobile.cmos</groupId>
<artifactId>sms-client</artifactId>
<version>0.0.7</version>
</dependency>

public void testsmpp() throws Exception {
SmsClientBuilder builder = new SmsClientBuilder();
SmsClient smsClient = builder.uri("smpp://127.0.0.1:18890?username=test01&password=1qaz2wsx&version=52&window=32&maxchannel=1")
.receiver(new MessageReceiver() {
public void receive(BaseMessage message) {
logger.info("receive : {}",message.toString());
}
}).build();

for (int i = 0; i < 5; i++) {

SubmitSm pdu = new SubmitSm();
pdu.setRegisteredDelivery((byte)1);
pdu.setSourceAddress(new Address((byte)0,(byte)0,"10086"));
pdu.setDestAddress(new Address((byte)0,(byte)0,"13800138000"));
pdu.setSmsMsg(new SmsTextMessage("SmsTextMessage " + i,SmsDcs.getGeneralDataCodingDcs(SmsAlphabet.GSM,SmsMsgClass.CLASS_UNKNOWN)));
try {
smsClient.send(pdu, 1000);
} catch (Exception e) {
logger.info("send ", e);
}
}
Thread.sleep(5000000);
}

最新更新