带有TCP套接字的Java应用程序正在使用100%的CPU



服务器必须持续侦听传入连接,并对接收到的数据执行一些逻辑。每次运行应用程序时,CPU使用率都超过90%。早些时候,我认为while循环可能正在旋转(忙于等待(,但readLine((应该是一个阻塞调用,所以我不认为是这样。感谢您的帮助!以下是服务器代码:

public void listen() throws IOException
{
try( ServerSocket serverSocket = new ServerSocket(port);
Socket clientSocket = serverSocket.accept();
DataOutputStream outputStream = new DataOutputStream(clientSocket.getOutputStream());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));)
{
String data = null;
while((data = bufferedReader.readLine()) != null)
{
Message message = Message.deserializeMessage(data);
synchronized (PeerNode.requestHistory)
{
if(PeerNode.requestHistory.keySet().contains(message)) 
{
continue;
}
}
if(message.getType() == 0 && message.getHopCount() < 1) {
continue;
}
switch(message.getType()) {
case 0:
synchronized (PeerNode.sharedRequestBuffer){ 
PeerNode.sharedRequestBuffer.offer(message);
}
break;
case 1:
synchronized (PeerNode.sharedReplyBuffer) {
PeerNode.sharedReplyBuffer.offer(message);
}
break;
case 2:
synchronized (PeerNode.numberOfItems) {
if(PeerNode.numberOfItems > 0) {
PeerNode.numberOfItems -= 1;
}
outputStream.writeBytes("0" + "n");
}
break;
}
synchronized (PeerNode.requestHistory) {
PeerNode.requestHistory.put(message, 0);
}
}
}
catch(Exception ex)
{
ex.printStackTrace();
}
}

编辑:添加了反序列化((方法

public static Message deserializeMessage(String s)
{
Message m = new Message();
String[] objArray = s.split("#");
String[] list = objArray[2].split(",");
m.setProductName(objArray[0]); 
m.setProductId(Integer.parseInt(objArray[1]));
List<Integer> tempList = new ArrayList();
for(int i=0; i<list.length; i++)
{
if(list[i].length() == 0)
continue;
tempList.add(Integer.parseInt(list[i]));
}
m.setMessagePath(tempList);
m.setHopCount(Integer.parseInt(objArray[3]));
m.setType(Integer.parseInt(objArray[4]));
m.setRequestId(Integer.parseInt(objArray[5]));
m.setSourcePeerId(Integer.parseInt(objArray[6]));
m.setDestinationSellerId(Integer.parseInt(objArray[7]));
m.setDestinationSellerLocation(Integer.parseInt(objArray[8]));
return m;
}

编辑2:已将deserialize((更改为使用Scanner((:

public static Message deserializeMessage(String s)
{
Message m = new Message();
Scanner sc = new Scanner(s);
sc.useDelimiter("#");
m.setProductName(sc.next());
m.setProductId(Integer.parseInt(sc.next()));
List<Integer> tempList = new ArrayList();
Scanner sct = new Scanner(sc.next());
sct.useDelimiter(",");
while(sct.hasNext())
{
tempList.add(Integer.parseInt(sct.next()));
}
m.setMessagePath(tempList);
m.setHopCount(Integer.parseInt(sc.next()));
m.setType(Integer.parseInt(sc.next()));
m.setRequestId(Integer.parseInt(sc.next()));
m.setSourcePeerId(Integer.parseInt(sc.next()));
m.setDestinationSellerId(Integer.parseInt(sc.next()));
m.setDestinationSellerLocation(Integer.parseInt(sc.next()));
return m;
}

编辑:更新的服务器代码:

private ExecutorService executor = Executors.newFixedThreadPool(15);
public void listen() throws IOException
{
serverSocket = new ServerSocket(port);
while (!Thread.interrupted()) {
try
{
//Server, Listening........
clientSocket = serverSocket.accept();
ServerExecutor serverExecutor = new ServerExecutor(peerID, clientSocket);
executor.submit(serverExecutor);
}
catch (IOException e)
{
e.printStackTrace();
}
}
serverSocket.close();
}

ServerExecutor类:

public ServerExecutor(int _peerID, Socket _clientSocket)
{
this.peerID = _peerID;
this.clientSocket = _clientSocket;
}
public void run() 
{
try( DataOutputStream outputStream = new DataOutputStream(clientSocket.getOutputStream());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));)
{
String data = null;
while((data = bufferedReader.readLine()) != null)
{
Message message = Message.deserializeMessage(data);
synchronized (PeerNode.requestHistory)
{
if(PeerNode.requestHistory.keySet().contains(message)) 
{
continue;
}
}
if(message.getType() == 0 && message.getHopCount() < 1) {
continue;
}
switch(message.getType()) {
case 0:
synchronized (PeerNode.sharedRequestBuffer){ 
PeerNode.sharedRequestBuffer.offer(message);
}
break;
case 1:
synchronized (PeerNode.sharedReplyBuffer) {
PeerNode.sharedReplyBuffer.offer(message);
}
break;
case 2:
synchronized (PeerNode.numberOfItems) {
if(PeerNode.numberOfItems > 0) {
PeerNode.numberOfItems -= 1;
}
outputStream.writeBytes("0" + "n");
}
break;
}
synchronized (PeerNode.requestHistory) {
PeerNode.requestHistory.put(message, 0);
}
}
clientSocket.close();
}
catch(Exception ex)
{
ex.printStackTrace();
}
}

更新的反序列化((:

public static Message deserializeMessage(String s)
{
Message m = new Message();
Scanner sc = new Scanner(s);
sc.useDelimiter("#");
m.setProductName(sc.next());
m.setProductId(sc.nextInt());
List<Integer> tempList = new ArrayList();
Scanner sct = new Scanner(sc.next());
sct.useDelimiter(",");
while(sct.hasNext())
{
tempList.add(sct.nextInt());
}
m.setMessagePath(tempList);
m.setHopCount(sc.nextInt());
m.setType(sc.nextInt());
m.setRequestId(sc.nextInt());
m.setSourcePeerId(sc.nextInt());
m.setDestinationSellerId(sc.nextInt());
m.setDestinationSellerLocation(sc.nextInt());
return m;
}

您是否为每个新的客户端连接使用不同的线程,而且这种代码永远不会占用如此高的CPU负载。是否还有其他功能在后台运行?编辑:也许你可以试试这样的东西?

while (true)  
{ 
Socket s = null; 
ServerSocket serverSocket = new ServerSocket(5555);
try 
{ 
s = ss.accept(); //ss is the server socket object
DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream());
BufferedReader int = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
//create a new Thread for the client
Thread t = new TaskHandler(s, in, out); //run method of Task Handler can have the code you want to execute for each connected client
t.start(); 
} 
catch (Exception e){ 
s.close(); 
e.printStackTrace(); 
} 
} 

这有帮助吗?

最新更新