剥离文件并通过TCP同时将块写入服务器显示损坏的管道错误



我的客户端将一个文件分成多个块(每个块128mb(,然后它会使用goroutines同时将块上传到多个服务器。

但是,当我使用超过 1 个 goroutine 时,我的客户端程序出现错误。

write tcp [::1]:49324->[::1]:2001: write: broken pipe

在我的服务器中,错误是

EOF

请注意,断开的管道错误和EOF错误发生在不同的块中。例如,写入区块 1 时可能会发生管道中断错误,而服务器接收区块 2 时可能会发生 EOF 错误。

下面是客户端代码:

//set maximum no. of goroutine running in the back
maxGoroutines := 3
guard := make(chan struct{}, maxGoroutines)
var sentByte int64
for i:= 0; i < chunkCount; i += 1{
    guard <- struct{}{} 
    go func(i int){
        index := i%len(serverList)
        vsConnection, _ := net.Dial("tcp", serverList[index])
        sentByte=0
        file, _ := os.Open(fileName)
        file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
        for { 
            n, _ := file.Read(sendBuffer)
            n2, err2 := vsConnection.Write(sendBuffer[:n])
            if err2 != nil {
                fmt.Println("err2",err2,chunkName)              
            }
            if(n2!=65536){ //65536 is size of sendBuffer
                fmt.Println("n2",n2)
            }
            sentByte = sentByte+int64(n)
            if(sentByte == CHUNKSIZE){
                break;
            }
        }
        vsConnection.Close()
        file.Close()
        <-guard
    }(i)
}

以下是服务器代码:

func main() {
    mapping := cmap.New()
    server, error := net.Listen("tcp", ":2001")
    if error != nil {
        fmt.Println("There was an error starting the server" +    error.Error())
        return
    }
    for {
        connection, error := server.Accept()
        if error != nil {
            fmt.Println("There was am error with the connection" + error.Error())
            return
        }
        //one goroutine per connection
        go ConnectionHandler(connection,mapping)
    }
}
func ConnectionHandler(connection net.Conn, mapping cmap.ConcurrentMap) {
    fmt.Println("Connected")
    //make a buffer to hold data        
    var bufferFile bytes.Buffer
    writer := bufio.NewWriter(&bufferFile)
    var receivedBytes int64
    receivedBytes=0
    for {
        if(CHUNKSIZE<=receivedBytes){
            break
        }
        n,err := io.CopyN(writer, connection, BUFFERSIZE)
        receivedBytes += n
        if err != nil {
            fmt.Println("err", err.Error(), fileName)
            break
        }
    }
    mapping.Set(fileName,bufferFile.Bytes())
    connection.Close()
}

提前非常感谢。

在你的客户端中,sentByte应该是发送方goroutine的局部变量。由于已将其声明为全局,因此代码中存在争用条件。请尝试以下解决方法:

go func(i int){
    index := i%len(serverList)
    vsConnection, _ := net.Dial("tcp", serverList[index])
    sentByte := 0 // make sentByte a local variable, so each goroutine 
                  // has its own copy 
    file, _ := os.Open(fileName)
    file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
    for { 
        n, _ := file.Read(sendBuffer)
        // ...

最新更新