Java Sockets. Hanging on ObjectInputStream()



我目前正在使用java套接字编写一个使用paxos共识算法的P2P应用程序,并且遇到了一个问题,我的程序似乎挂在ObjectOutputStream()上。

P2P网络中的每个成员都有一个所有传出连接的列表,以及一个处理传入连接的服务器线程。然后,这个服务器线程有一个serversocket,它为每个接受的传入套接字生成一个处理程序线程。

首先,这是我的Member类。这里的关键功能是sendProposalToAll()sendResponse()

/**
 * Represents a member in the P2P member network
 */
public class Member {
    private Integer portNumber;
    private String memberName;
    private List<Socket> clientSockets = new ArrayList<Socket>();
    private MemberServerThread serverThread;
    /**
     * Constructs the member with given portnumber
     * Spawns thread for the member and initialises all client sockets
     * 
     * @param memberName the name of the member
     * @param portNumber the port number for the member to use for it's client and
     *                   server sockets
     * @throws IOException if an I/O error occurs when creating a client socket or
     *                     opening the server socket
     */
    public Member(String memberName, Integer portNumber) throws IOException {
        this.memberName = memberName;
        this.portNumber = portNumber;
        // initialise clients
        for (Integer port : MemberNetwork.getOtherMemberPorts(portNumber)) {
            clientSockets.add(new Socket("localhost", port));
        }
        // initialise server
        serverThread = new MemberServerThread(portNumber);
        serverThread.start();
    }
    /**
     * Creates a new socket connection to the new member of the P2P network
     * 
     * @param newMemberPort the port of the new member added to the P2P network
     * @throws UnknownHostException if the IP address of the host could not be
     *                              determined
     * @throws IOException          if an I/O error occurs when creating the socket
     */
    private void addNewMember(Integer newMemberPort) throws UnknownHostException, IOException {
        clientSockets.add(new Socket("localhost", newMemberPort));
    }
    /**
     * 
     * @return the member name
     */
    public String getMemberName() {
        return memberName;
    }
    /**
     * 
     * @return the member port number
     */
    public Integer getPortNumber() {
        return portNumber;
    }
    /**
     * Creates a new socket connection to the new member of the P2P network
     * 
     * @param newMemberPort the port of the new member added to the P2P network
     * @throws UnknownHostException if the IP address of the host could not be
     *                              determined
     * @throws IOException          if an I/O error occurs when creating the socket
     */
    public void createNewClientSocket(Integer newMemberPort) throws UnknownHostException, IOException {
        addNewMember(newMemberPort);
    }
    /**
     * Invokes the client thread to send the proposal to all other members of the
     * P2P network
     * 
     * @param proposal the proposal to send to the other members on the P2P network
     * @throws IOException if an I/O error occurs when creating the output stream or
     *                     if the socket is not connected or if an I/O error occurs
     *                     while writing stream header
     */
    public void sendProposalToAll(PropositionType type, String proposedMember) throws IOException {
        Proposal proposal = new Proposal(portNumber, type, proposedMember);
        for (Socket socket : clientSockets) {
            // create object output stream from the socket
            OutputStream outputStream = socket.getOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
            objectOutputStream.flush();
            // send the proposal
            System.out.println(
                    "Sending proposal to " + MemberNetwork.getMemberByPortNumber(socket.getPort()).getMemberName());
            objectOutputStream.writeObject(proposal);
        }
    }
    /**
     * Invokes the client thread to send the response to the desired member of the
     * P2P network
     * 
     * @param type       the type of response
     * @param proposalId the proposal Id that the response refers to
     * @param memberName the name of the member that the response is targetting
     * @throws IOException     if an I/O error occurs when creating the output
     *                         stream or if the socket is not connected or if an I/O
     *                         error occurs while writing stream header
     * @throws SocketException if the specified destination port does not match any
     *                         server socket
     */
    public void sendReponse(ResponseType type, int proposalId, String memberName) throws SocketException, IOException {
        Response response = new Response(type, proposalId, portNumber,
                MemberNetwork.getMemberByName(memberName).getPortNumber());
        // find socket with required destination port
        Socket socket = clientSockets
                .stream()
                .filter(
                        clientSocket -> response
                                .getDestinationPort()
                                .equals(clientSocket.getPort()))
                .findFirst()
                .orElse(null);
        if (socket == null) {
            throw new SocketException("Server socket on port: " + response.getDestinationPort() + "does not exist");
        }
        // create object output stream from the socket
        OutputStream outputStream = socket.getOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
        objectOutputStream.flush();
        outputStream.flush();
        // send the proposal
        System.out.println(
                "Sending response to " + MemberNetwork.getMemberByPortNumber(socket.getPort()).getMemberName());
        objectOutputStream.writeObject(response);
    }
}

接下来是如下所示的服务器线程,它只是生成处理程序

/**
 * The Member's Server Thread
 * Responsible for handling the member's server socket and connected clients
 */
public class MemberServerThread extends Thread {
    
    private ServerSocket serverSocket;
    /**
     * Initialises the member's server thread by creating
     * a server socket which listens on the given port number
     * 
     * @param portNumber the port number for the server to bind to
     * @throws IOException if an I/O error occurs when opening the socket.
     */
    public MemberServerThread(Integer portNumber) throws IOException {
        serverSocket = new ServerSocket(portNumber);
    }

    // run the thread
    public void run() {
        
        // connect to peers
        while (true) {
            try {
                new IncomingMemberHandler(serverSocket.accept()).start();
                
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

最后,下面是最后一个相关文件,其中删除了一些内容以显示问题。

/**
 * Handles the individual sockets connected to the member's server socket
 */
public class IncomingMemberHandler extends Thread {
    private Socket incomingMember;
    private Map<Integer, Integer> acceptedPromises = new HashMap<Integer, Integer>();
    /**
     * Initialises a member handler
     * 
     * @param memberSocket the incoming member socket connected to the server
     */
    public IncomingMemberHandler(Socket memberSocket) {
        this.incomingMember = memberSocket;
    }
    public void run() {
        try {
            // create object input stream from the socket
            InputStream inputStream = incomingMember.getInputStream();
            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
            Object incomingObject = objectInputStream.readObject();
            // obtain the proposal
            if (incomingObject.getClass().equals(Proposal.class)) {
                Proposal incomingProposal = (Proposal) incomingObject;
                int proposalId = incomingProposal.getId();
                // get proposer port
                Integer proposerPort = incomingMember.getLocalPort();
                // get the proposer
                Member proposer = MemberNetwork.getMemberByPortNumber(proposerPort);
                String proposerName = proposer.getMemberName();
                // get the acceptor
                Member acceptor = MemberNetwork.getMemberByPortNumber(incomingProposal.getSourcePort());
                acceptor.sendReponse(ResponseType.PROMISE_ACCEPTED, proposalId, proposerName);
                System.out.println("Received " + incomingProposal.getType() + " from " + proposerName);
                // obtain the response
            } else if (incomingObject.getClass().equals(Response.class)) {
                Response incomingResponse = (Response) incomingObject;
                // get the acceptor
                Member acceptor = MemberNetwork.getMemberByPortNumber(incomingResponse.getSourcePort());
                String acceptorName = acceptor.getMemberName();
                acceptedPromises.merge(incomingResponse.getId(), 1, Integer::sum);
                System.out.println("Received " + incomingResponse.getType() + " from " + acceptorName);
            } else {
                throw new ClassNotFoundException("Unexpected Class: " + incomingObject.getClass());
            }
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

一切都像预期的那样工作,除了sendResponse()IncomingMemberHandler中被击中。我试过调试,发现它确实成功地运行了sendResponse()函数,但是,一旦

ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);

点击,程序挂起,不再继续。

我希望有人能帮我找出问题是什么,因为它把我难住了。(

在向输出流写入对象后,您既没有flush()也没有close(),这意味着它可能仍然在服务器的内存中,等待在发送它之前出现更多的东西。TCP/IP有开销;一次一个字节地发送每个字节是非常低效的,这就是为什么存在刷新的概念。

最新更新