我目前正在使用java套接字编写一个使用paxos共识算法的P2P应用程序,并且遇到了一个问题,我的程序似乎挂在ObjectOutputStream()上。
P2P网络中的每个成员都有一个所有传出连接的列表,以及一个处理传入连接的服务器线程。然后,这个服务器线程有一个serversocket,它为每个接受的传入套接字生成一个处理程序线程。
首先,这是我的Member类。这里的关键功能是sendProposalToAll()
和sendResponse()
。
/**
* Represents a member in the P2P member network
*/
public class Member {
private Integer portNumber;
private String memberName;
private List<Socket> clientSockets = new ArrayList<Socket>();
private MemberServerThread serverThread;
/**
* Constructs the member with given portnumber
* Spawns thread for the member and initialises all client sockets
*
* @param memberName the name of the member
* @param portNumber the port number for the member to use for it's client and
* server sockets
* @throws IOException if an I/O error occurs when creating a client socket or
* opening the server socket
*/
public Member(String memberName, Integer portNumber) throws IOException {
this.memberName = memberName;
this.portNumber = portNumber;
// initialise clients
for (Integer port : MemberNetwork.getOtherMemberPorts(portNumber)) {
clientSockets.add(new Socket("localhost", port));
}
// initialise server
serverThread = new MemberServerThread(portNumber);
serverThread.start();
}
/**
* Creates a new socket connection to the new member of the P2P network
*
* @param newMemberPort the port of the new member added to the P2P network
* @throws UnknownHostException if the IP address of the host could not be
* determined
* @throws IOException if an I/O error occurs when creating the socket
*/
private void addNewMember(Integer newMemberPort) throws UnknownHostException, IOException {
clientSockets.add(new Socket("localhost", newMemberPort));
}
/**
*
* @return the member name
*/
public String getMemberName() {
return memberName;
}
/**
*
* @return the member port number
*/
public Integer getPortNumber() {
return portNumber;
}
/**
* Creates a new socket connection to the new member of the P2P network
*
* @param newMemberPort the port of the new member added to the P2P network
* @throws UnknownHostException if the IP address of the host could not be
* determined
* @throws IOException if an I/O error occurs when creating the socket
*/
public void createNewClientSocket(Integer newMemberPort) throws UnknownHostException, IOException {
addNewMember(newMemberPort);
}
/**
* Invokes the client thread to send the proposal to all other members of the
* P2P network
*
* @param proposal the proposal to send to the other members on the P2P network
* @throws IOException if an I/O error occurs when creating the output stream or
* if the socket is not connected or if an I/O error occurs
* while writing stream header
*/
public void sendProposalToAll(PropositionType type, String proposedMember) throws IOException {
Proposal proposal = new Proposal(portNumber, type, proposedMember);
for (Socket socket : clientSockets) {
// create object output stream from the socket
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.flush();
// send the proposal
System.out.println(
"Sending proposal to " + MemberNetwork.getMemberByPortNumber(socket.getPort()).getMemberName());
objectOutputStream.writeObject(proposal);
}
}
/**
* Invokes the client thread to send the response to the desired member of the
* P2P network
*
* @param type the type of response
* @param proposalId the proposal Id that the response refers to
* @param memberName the name of the member that the response is targetting
* @throws IOException if an I/O error occurs when creating the output
* stream or if the socket is not connected or if an I/O
* error occurs while writing stream header
* @throws SocketException if the specified destination port does not match any
* server socket
*/
public void sendReponse(ResponseType type, int proposalId, String memberName) throws SocketException, IOException {
Response response = new Response(type, proposalId, portNumber,
MemberNetwork.getMemberByName(memberName).getPortNumber());
// find socket with required destination port
Socket socket = clientSockets
.stream()
.filter(
clientSocket -> response
.getDestinationPort()
.equals(clientSocket.getPort()))
.findFirst()
.orElse(null);
if (socket == null) {
throw new SocketException("Server socket on port: " + response.getDestinationPort() + "does not exist");
}
// create object output stream from the socket
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.flush();
outputStream.flush();
// send the proposal
System.out.println(
"Sending response to " + MemberNetwork.getMemberByPortNumber(socket.getPort()).getMemberName());
objectOutputStream.writeObject(response);
}
}
接下来是如下所示的服务器线程,它只是生成处理程序
/**
* The Member's Server Thread
* Responsible for handling the member's server socket and connected clients
*/
public class MemberServerThread extends Thread {
private ServerSocket serverSocket;
/**
* Initialises the member's server thread by creating
* a server socket which listens on the given port number
*
* @param portNumber the port number for the server to bind to
* @throws IOException if an I/O error occurs when opening the socket.
*/
public MemberServerThread(Integer portNumber) throws IOException {
serverSocket = new ServerSocket(portNumber);
}
// run the thread
public void run() {
// connect to peers
while (true) {
try {
new IncomingMemberHandler(serverSocket.accept()).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
最后,下面是最后一个相关文件,其中删除了一些内容以显示问题。
/**
* Handles the individual sockets connected to the member's server socket
*/
public class IncomingMemberHandler extends Thread {
private Socket incomingMember;
private Map<Integer, Integer> acceptedPromises = new HashMap<Integer, Integer>();
/**
* Initialises a member handler
*
* @param memberSocket the incoming member socket connected to the server
*/
public IncomingMemberHandler(Socket memberSocket) {
this.incomingMember = memberSocket;
}
public void run() {
try {
// create object input stream from the socket
InputStream inputStream = incomingMember.getInputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
Object incomingObject = objectInputStream.readObject();
// obtain the proposal
if (incomingObject.getClass().equals(Proposal.class)) {
Proposal incomingProposal = (Proposal) incomingObject;
int proposalId = incomingProposal.getId();
// get proposer port
Integer proposerPort = incomingMember.getLocalPort();
// get the proposer
Member proposer = MemberNetwork.getMemberByPortNumber(proposerPort);
String proposerName = proposer.getMemberName();
// get the acceptor
Member acceptor = MemberNetwork.getMemberByPortNumber(incomingProposal.getSourcePort());
acceptor.sendReponse(ResponseType.PROMISE_ACCEPTED, proposalId, proposerName);
System.out.println("Received " + incomingProposal.getType() + " from " + proposerName);
// obtain the response
} else if (incomingObject.getClass().equals(Response.class)) {
Response incomingResponse = (Response) incomingObject;
// get the acceptor
Member acceptor = MemberNetwork.getMemberByPortNumber(incomingResponse.getSourcePort());
String acceptorName = acceptor.getMemberName();
acceptedPromises.merge(incomingResponse.getId(), 1, Integer::sum);
System.out.println("Received " + incomingResponse.getType() + " from " + acceptorName);
} else {
throw new ClassNotFoundException("Unexpected Class: " + incomingObject.getClass());
}
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
}
一切都像预期的那样工作,除了sendResponse()
在IncomingMemberHandler
中被击中。我试过调试,发现它确实成功地运行了sendResponse()
函数,但是,一旦
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
点击,程序挂起,不再继续。我希望有人能帮我找出问题是什么,因为它把我难住了。(
在向输出流写入对象后,您既没有flush()
也没有close()
,这意味着它可能仍然在服务器的内存中,等待在发送它之前出现更多的东西。TCP/IP有开销;一次一个字节地发送每个字节是非常低效的,这就是为什么存在刷新的概念。