如何测试多线程服务器



我有一个主线程服务器,基本上可以监听想要连接到端口的人

/**
* The main server thread receives request from and sends
* response to clients.
*/
public class Server {
/*
Port number for the client connection
*/
private static final int PORT = 3000;
/*
The number of client can be connected
*/
private static final int SIZE = 10;
/*
The executor
*/
private static ExecutorService executorService = Executors.newFixedThreadPool(SIZE);

/**
* Starts the main server.
*/
public static void main(String[] args) {
/*
All the information are stored into the queue
*/
BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
/*
All the clients are stored into the map
*/
ConcurrentMap<byte[], Boolean> clientManagement = new ConcurrentHashMap<>();
runMainServer(messageQueue, clientManagement);
}
private static void runMainServer(BlockingQueue<Message> messageQueue, ConcurrentMap<byte[], Boolean> clientManagement) {
try (
ServerSocket serverSocket = new ServerSocket(PORT);
) {
System.out.println("Starting server");
while (true) {
System.out.println("Waiting for request");
Socket socket = serverSocket.accept();
System.out.println("Processing request");
ClientThread newClient = new ClientThread(socket, messageQueue, clientManagement);
executorService.submit(newClient);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

我有许多多线程子服务器来处理每个相同的客户端。客户端首先将被服务器接受,并检查服务器收到的第一条消息是否为connect_message。如果是,那么它们已正式连接。除了connect_message之外,还有更多的信息。但我不会对它们太具体。

/**
* The client thread.
*/
public class ClientThread implements Runnable {
private Socket socket;
private BlockingQueue<Message> messageQueue;
private ConcurrentMap<byte[], Boolean> clientManagement;
private byte[] username;
/**
*
*
* @param socket
* @param messageQueue
* @param clientManagement
*/
public ClientThread(Socket socket, BlockingQueue<Message> messageQueue, ConcurrentMap<byte[], Boolean> clientManagement) {
this.socket = socket;
this.messageQueue = messageQueue;
this.clientManagement = clientManagement;
this.username = new byte[1];
}
/**
*
*/
@Override
public void run() {
try (
ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
) {
Message m = (Message) in.readObject();
if (m.getIdentifier() == MessageIdentifier.CONNECT_MESSAGE) {
ConnectMessage cm = (ConnectMessage) m;
this.username = cm.getUsername();
clientManagement.put(cm.getUsername(), true);
byte[] cntMsg = "Successfully Connected!".getBytes();
ConnectResponse cr = new ConnectResponse(true, cntMsg.length, cntMsg);
out.writeObject(cr);
} else {
// Handle failing request
handleFailedMsg(out, "Client should connect first");
socket.close();
throw new IllegalArgumentException("Connect unsuccessfully");
}
handleClient(in, out);
socket.close();
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
/**
*
* @param in
* @param out
* @throws InterruptedException
* @throws IOException
* @throws ClassNotFoundException
*/
private void handleClient(ObjectInputStream in, ObjectOutputStream out)
throws InterruptedException, IOException, ClassNotFoundException {
while (true) {
// Handle message taken from the queue
Message msgFromQueue = messageQueue.take();
handleQueueRequest(msgFromQueue, out);
// Handle request obtained by user
Message request = (Message) in.readObject();
// Handle disconnect
if (request.getIdentifier() == MessageIdentifier.DISCONNECT_MESSAGE) {
DisconnectMessage dm = (DisconnectMessage) request;
// If the message is not for this thread, then put it back and ignore it.
if (!Arrays.equals(username, dm.getUsername())) {
messageQueue.add(request);
continue;
}
// Check if the username is inside the client map
if (!clientManagement.containsKey(dm.getUsername())) {
handleFailedMsg(out, "The client doesn't exist");
}
// Disconnect
clientManagement.remove(dm.getUsername());
// Create disconnect response
byte[] message = "See you again".getBytes();
DisconnectResponse dr = new DisconnectResponse(true, message.length, message);
// Write to the client
out.writeObject(dr);
break;
}
// Handle other
if (!handleRequest(request, out)) {
handleFailedMsg(out, "The request failed due to incorrect username.");
}
}
}
/**
*
* @param request
* @param out
* @return
* @throws IOException
*/
private boolean handleRequest(Message request, ObjectOutputStream out) throws IOException {
switch (request.getIdentifier()) {
// If broadcast, then every one should know
case BROADCAST_MESSAGE:
BroadcastMessage bm = (BroadcastMessage) request;
if (!Arrays.equals(username, bm.getUsername())) {
return false;
}
messageQueue.add(request);
break;
// If user want the list of connected users
case QUERY_CONNECTED_USERS:
QueryUsersMessage qu = (QueryUsersMessage) request;
if (!Arrays.equals(username, qu.getUsername())) {
return false;
}
List<Pair<Integer, byte[]>> userList = new ArrayList<>();
for (byte[] username : clientManagement.keySet()) {
Pair<Integer, byte[]> user = new Pair<>(username.length, username);
userList.add(user);
}
// Create a new query response containing all the users
QueryResponse qr = new QueryResponse(clientManagement.keySet().size(), userList);
out.writeObject(qr);
break;
// If user wants to send a direct message to the other user
case DIRECT_MESSAGE:
DirectMessage dm = (DirectMessage) request;
if (!Arrays.equals(username, dm.getUsername())) {
return false;
}
messageQueue.add(request);
break;
// If user wants to send an insult to the other user and broadcast to the chat room
case SEND_INSULT:
SendInsultMessage si = (SendInsultMessage) request;
if (!Arrays.equals(username, si.getUsername())) {
return false;
}
messageQueue.add(request);
break;
}
return true;
}
/**
*
* @param out
* @param description
* @throws IOException
*/
public void handleFailedMsg(ObjectOutputStream out, String description) throws IOException {
byte[] failedMsg = description.getBytes();
FailedMessage fm = new FailedMessage(failedMsg.length, failedMsg);
out.writeObject(fm);
}
/**
*
* @param request
* @param out
* @throws IOException
*/
public void handleQueueRequest(Message request, ObjectOutputStream out) throws IOException {
switch (request.getIdentifier()) {
case SEND_INSULT:
// Gets the message from queue
SendInsultMessage si = (SendInsultMessage) request;
// Check if the user already gotten the message
if (!si.getOtherUsers().contains(username)) {
out.writeObject(si);
si.addUsers(username);
}
// Check if all the users already gotten the message
if (si.getOtherUsers().size() < clientManagement.keySet().size()) {
messageQueue.add(si);
}
break;
case DIRECT_MESSAGE:
DirectMessage dm = (DirectMessage) request;
// Check if the message is for this user
if (Arrays.equals(username, dm.getRecipientUsername())) {
out.writeObject(dm);
} else { // If not for this user then put it back
messageQueue.add(dm);
}
break;
case BROADCAST_MESSAGE:
// Gets the message from queue
BroadcastMessage bm = (BroadcastMessage) request;
// Check if the user already gotten the message
if (!bm.getOtherUsers().contains(username)) {
out.writeObject(bm);
bm.addUsers(username);
}
// Check if all the users already gotten the message
if (bm.getOtherUsers().size() < clientManagement.keySet().size()) {
messageQueue.add(bm);
}
break;
}
}

我想为我的服务器做 JUnit 测试。测试这样的多线程服务器的最佳方法是什么?

这是我正在尝试的 JUnit 测试代码。我首先启动一个被服务器接受的线程。然后我将启动一个客户端并假装客户端正在向服务器发送一些东西。我首先想尝试一个connect_message,看看连接是如何工作的。但到目前为止,该测试似乎没有响应 JUnit 测试。它只是继续运行,什么也没发生

public class ClientThreadTest {
private Thread foo;
private List<ClientThread> clientList;
private BlockingQueue<Message> messageQueue;
private ConcurrentMap<byte[], Boolean> clientManagement;
private static final int PORT = 3000;
@Before
public void setUp() throws Exception {
messageQueue = new LinkedBlockingQueue<>();
clientManagement = new ConcurrentHashMap<>();
}
@Test
public void run() throws IOException, ClassNotFoundException {
ServerSocket socket = new ServerSocket(PORT);
foo = new Thread(new ClientThread(socket.accept(), messageQueue, clientManagement));
foo.start();
Socket fooClient = new Socket("localhost", PORT);
ObjectOutputStream out = new ObjectOutputStream(fooClient.getOutputStream());
ObjectInputStream in = new ObjectInputStream(fooClient.getInputStream());
// First test Connection message
byte[] username = "foo".getBytes();
ConnectMessage cm = new ConnectMessage(username.length, username);
// The message need to get
byte[] cntMsg = "Successfully Connected!".getBytes();
ConnectResponse cr = new ConnectResponse(true, cntMsg.length, cntMsg);
out.writeObject(cm);
ConnectResponse m = (ConnectResponse) in.readObject();
System.out.println(Arrays.toString(m.getMessage()));
}

我已经解决了我自己的问题!

适用于在多线程服务器上进行 JUnit 测试的任何人。这是我的建议:

您必须在开始时启动主服务器,然后再进行其他任何事情。让你的主服务器监听你给它的一些端口。

然后你必须启动你的客户端,你必须给它一个与你给主服务器的相同的端口号

最后但并非最不重要的一点是,您可以启动线程来处理特定的客户端。不知何故,如果我将我的线程实例化为 ClientThread foo,并且我调用了 foo.run((,它将无法正常工作。我必须实例化 Thread foo,并将我的 ClientThread(( 作为 Thread(( 的输入,并调用 foo.start(( !

现在它正在工作!

最新更新