有两个线程监视套接字的输出流和输入流是否相互影响



所以我有一个用Java制作的客户端-服务器接口。我想接受来自多个客户端的消息,这些功能很好。然后,我想将来自这些客户端的消息广播到所有其他连接的客户端,这不能按预期运行。

我尝试通过服务器类中的 for 循环广播收到的消息:

private void broadcastMessage(String message) {
        for (int i = 0, j = clients.size(); i <= j; i++) {
            PrintWriter out = null;
            Socket socket = clients.get(i);
            try {
                out = new PrintWriter(new BufferedWriter(
                        new OutputStreamWriter(socket.getOutputStream())), true);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            // WHERE YOU ISSUE THE COMMANDS
            out.println(message);
            Log.d("SERVER Loop", "Broadcasting messages...");
            out.close();
        }
        Log.d("SERVER", "Message Brodcasted");
    }

然后,我尝试通过客户端类中的侦听器接收:

    public class ClientThreadListener implements Runnable {
    protected Socket serverSocket = null;
    protected String mMsgFromServer;
    public ClientThreadListener(Socket serverSocket) {
        this.serverSocket = serverSocket;
    }
    public void run() {
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(
                    serverSocket.getInputStream()));
            while ((mMsgFromServer = in.readLine()) != null) {
                Log.d("MESSAGE FROM SERVER: ", mMsgFromServer);
                handler.post(new Runnable() {
                    @Override
                    public void run() {
                        msgFromOtherClients.append('n'
                                + "Message From Server: " + mMsgFromServer);
                    }
                });
            }
        } catch (Exception e) {
            Log.e("ClientListener", "C: Error", e);
            connected = false;
        }
    }
}

我没有收到任何错误或强制关闭。原谅我知道这很混乱,但请耐心等待,请专注于手头的问题,而不是:D

下面是服务器类的完整代码

public class Server extends Activity {
private TextView serverStatus;
// DEFAULT IP
public static String SERVERIP = "10.0.2.15";
// DESIGNATE A PORT
public static final int SERVERPORT = 8080;
private Handler handler = new Handler();
private ServerSocket serverSocket;
private String mMsgFromClient;
private MultiThreadedServer server;
private ArrayList<Socket> clients = new ArrayList<Socket>();
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.server);
    serverStatus = (TextView) findViewById(R.id.server_status);
    // SERVERIP = getLocalIpAddress();
    server = new MultiThreadedServer(8080);
    new Thread(server).start();
}
public class MultiThreadedServer implements Runnable {
    protected int serverPort = 8080;
    protected ServerSocket serverSocket = null;
    protected boolean isStopped = false;
    protected Thread runningThread = null;
    public MultiThreadedServer(int port) {
        this.serverPort = port;
    }
    public void run() {
        synchronized (this) {
            this.runningThread = Thread.currentThread();
        }
        openServerSocket();
        while (!isStopped()) {
            Socket clientSocket = null;
            try {
                clientSocket = this.serverSocket.accept();
                clients.add(clientSocket);
            } catch (IOException e) {
                if (isStopped()) {
                    Log.d("SERVER TEXT", "Server Stopped.");
                    return;
                }
                throw new RuntimeException(
                        "Error accepting client connection", e);
            }
            new Thread(new WorkerRunnable(clientSocket, this)).start();
        }
        Log.d("SERVER TEXT", "Server Stopped.");
    }
    private synchronized boolean isStopped() {
        return this.isStopped;
    }
    public synchronized void stop() {
        this.isStopped = true;
        try {
            this.serverSocket.close();
        } catch (IOException e) {
            throw new RuntimeException("Error closing server", e);
        }
    }
    private void openServerSocket() {
        try {
            this.serverSocket = new ServerSocket(this.serverPort);
        } catch (IOException e) {
            throw new RuntimeException("Cannot open port 8080", e);
        }
    }
    private void broadcastMessage(String message) {
        for (int i = 0, j = clients.size(); i <= j; i++) {
            PrintWriter out = null;
            Socket socket = clients.get(i);
            try {
                out = new PrintWriter(new BufferedWriter(
                        new OutputStreamWriter(socket.getOutputStream())), true);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            // WHERE YOU ISSUE THE COMMANDS
            out.println(message);
            Log.d("SERVER Loop", "Broadcasting messages...");
            out.close();
        }
        Log.d("SERVER", "Message Brodcasted");
    }
}
public class WorkerRunnable implements Runnable {
    protected Socket clientSocket = null;
    protected String mMsgFromClient = null;
    private UUID id;
    public WorkerRunnable(Socket clientSocket, MultiThreadedServer server) {
        this.clientSocket = clientSocket;
        id = UUID.randomUUID();
    }
    public void run() {
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(
                    clientSocket.getInputStream()));
            while ((mMsgFromClient = in.readLine()) != null) {
                handler.post(new Runnable() {
                    @Override
                    public void run() {
                        serverStatus.append('n'
                                + "Message From Client ID " + getID()
                                + ": " + mMsgFromClient);
                    }
                });
            }
            Log.d("SERVERTEXT", "Proceed to broadcast");
            server.broadcastMessage(mMsgFromClient);
        } catch (IOException e) {
            Handler handler = new Handler();
            handler.post(new Runnable() {
                @Override
                public void run() {
                    serverStatus
                            .append('n'
                                    + "Message From Client ID "
                                    + getID()
                                    + ": "
                                    + "Oops. Connection interrupted. Please reconnect your phones.");
                }
            });
            e.printStackTrace();
        }
    }
    private String getID() {
        return id.toString();
    }
}
}

下面是客户端类的完整代码

public class Client extends Activity {
private EditText serverIp;
private EditText chatMsg;
private Button connectPhones;
private Button sendMsg;
private TextView msgFromOtherClients;
private String serverIpAddress = "";
private boolean connected = false;
private boolean willSendMsg = false;
private Handler handler = new Handler();
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.client);
    serverIp = (EditText) findViewById(R.id.server_ip);
    connectPhones = (Button) findViewById(R.id.connect_phones);
    connectPhones.setOnClickListener(connectListener);
    chatMsg = (EditText) findViewById(R.id.chat_msg);
    sendMsg = (Button) findViewById(R.id.send_msg);
    sendMsg.setOnClickListener(new View.OnClickListener() {
        @Override
        public void onClick(View v) {
            willSendMsg = true;
        }
    });
    msgFromOtherClients = (TextView) findViewById(R.id.msg_from_other_clients);
}
private OnClickListener connectListener = new OnClickListener() {
    @Override
    public void onClick(View v) {
        if (!connected) {
            serverIpAddress = serverIp.getText().toString();
            if (!serverIpAddress.equals("")) {
                Thread cThread = new Thread(new ClientThread());
                cThread.start();
            }
        }
    }
};
public class ClientThread implements Runnable {
    public void run() {
        try {
            InetAddress serverAddr = InetAddress.getByName(serverIpAddress);
            Log.d("ClientActivity", "C: Connecting...");
            Socket socket = new Socket(serverAddr, Server.SERVERPORT);
            connected = true;
            Thread listener = new Thread(new ClientThreadListener(socket)));
            listener.start();
            while (connected) {
                if (willSendMsg) {
                    willSendMsg = false;
                    try {
                        Log.d("ClientActivity", "C: Sending command.");
                        PrintWriter out = new PrintWriter(
                                new BufferedWriter(new OutputStreamWriter(
                                        socket.getOutputStream())), true);
                        // WHERE YOU ISSUE THE COMMANDS
                        out.println(chatMsg.getText().toString());
                        Log.d("ClientActivity", "C: Sent.");
                    } catch (Exception e) {
                        Log.e("ClientActivity", "S: Error", e);
                    }
                }
            }
            socket.close();
            Log.d("ClientActivity", "C: Closed.");
        } catch (Exception e) {
            Log.e("ClientActivity", "C: Error", e);
            connected = false;
        }
    }
}
public class ClientThreadListener implements Runnable {
    protected Socket serverSocket = null;
    protected String mMsgFromServer;
    public ClientThreadListener(Socket serverSocket) {
        this.serverSocket = serverSocket;
    }
    public void run() {
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(
                    serverSocket.getInputStream()));
            while ((mMsgFromServer = in.readLine()) != null) {
                Log.d("MESSAGE FROM SERVER: ", mMsgFromServer);
                handler.post(new Runnable() {
                    @Override
                    public void run() {
                        msgFromOtherClients.append('n'
                                + "Message From Server: " + mMsgFromServer);
                    }
                });
            }
        } catch (Exception e) {
            Log.e("ClientListener", "C: Error", e);
            connected = false;
        }
    }
}
}

如您所见,我有两个线程侦听服务器套接字。我似乎不知道会发生什么,虽然为什么我不能广播消息或为什么我无法接收它。

关闭套接字或套接字的输入或输出流将关闭其他流和套接字。删除out.close().

TCP套接字

是全双工的,所以你正在做的事情本质上没有任何问题,尽管我几乎不会将写入套接字描述为"监视"它。

最新更新