我的客户端将一个文件分成多个块(每个块128mb(,然后它会使用goroutines同时将块上传到多个服务器。
但是,当我使用超过 1 个 goroutine 时,我的客户端程序出现错误。
write tcp [::1]:49324->[::1]:2001: write: broken pipe
在我的服务器中,错误是
EOF
请注意,断开的管道错误和EOF错误发生在不同的块中。例如,写入区块 1 时可能会发生管道中断错误,而服务器接收区块 2 时可能会发生 EOF 错误。
下面是客户端代码:
//set maximum no. of goroutine running in the back
maxGoroutines := 3
guard := make(chan struct{}, maxGoroutines)
var sentByte int64
for i:= 0; i < chunkCount; i += 1{
guard <- struct{}{}
go func(i int){
index := i%len(serverList)
vsConnection, _ := net.Dial("tcp", serverList[index])
sentByte=0
file, _ := os.Open(fileName)
file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
for {
n, _ := file.Read(sendBuffer)
n2, err2 := vsConnection.Write(sendBuffer[:n])
if err2 != nil {
fmt.Println("err2",err2,chunkName)
}
if(n2!=65536){ //65536 is size of sendBuffer
fmt.Println("n2",n2)
}
sentByte = sentByte+int64(n)
if(sentByte == CHUNKSIZE){
break;
}
}
vsConnection.Close()
file.Close()
<-guard
}(i)
}
以下是服务器代码:
func main() {
mapping := cmap.New()
server, error := net.Listen("tcp", ":2001")
if error != nil {
fmt.Println("There was an error starting the server" + error.Error())
return
}
for {
connection, error := server.Accept()
if error != nil {
fmt.Println("There was am error with the connection" + error.Error())
return
}
//one goroutine per connection
go ConnectionHandler(connection,mapping)
}
}
func ConnectionHandler(connection net.Conn, mapping cmap.ConcurrentMap) {
fmt.Println("Connected")
//make a buffer to hold data
var bufferFile bytes.Buffer
writer := bufio.NewWriter(&bufferFile)
var receivedBytes int64
receivedBytes=0
for {
if(CHUNKSIZE<=receivedBytes){
break
}
n,err := io.CopyN(writer, connection, BUFFERSIZE)
receivedBytes += n
if err != nil {
fmt.Println("err", err.Error(), fileName)
break
}
}
mapping.Set(fileName,bufferFile.Bytes())
connection.Close()
}
提前非常感谢。
在你的客户端中,sentByte
应该是发送方goroutine的局部变量。由于已将其声明为全局,因此代码中存在争用条件。请尝试以下解决方法:
go func(i int){
index := i%len(serverList)
vsConnection, _ := net.Dial("tcp", serverList[index])
sentByte := 0 // make sentByte a local variable, so each goroutine
// has its own copy
file, _ := os.Open(fileName)
file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
for {
n, _ := file.Read(sendBuffer)
// ...