通过使用扫描仪从文件中读取数据来同步可组合goroutine时出现问题.扫描()



我正在通过函数组合构建一个具有通道的过滤管道,而不是在一个方法中使用120+LOC因为我以后可能会重用这个管道的某个部分。

我没能使它按预期工作。我怀疑函数readValuesFromFilescanner.Scan()将值放入inputStream通道之前退出(即该方法的主goroutine在(1(goroutine之前退出(。

如果我只在通道中放入一些随机字符串来替换scanner.Scan()整个管道按预期运行。

这是问题所在还是我遗漏了什么?

如何以优雅的方式解决这个问题?

谢谢!

func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
inputStream := make(chan string)
go func() { //(1)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() { // (2)
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}

close(inputStream)
}()
return inputStream
}
func validateValues(inputStream <-chan string) <-chan string {
//read from the input stream + validate&filter + creating and putting values in an output stream
}
func writeResults(validStream <-chan string) {
//read from the validated stream and write data to file
}
func main() {
valueStream := readValuesFromFile("myfile.txt")
validatedStream := validateValues(valueStream)

writeResults(validatedStream)
}

函数readValuesFromFile保证在第一个值发送到inputStream之前返回。在发送器和接收器准备好之前,在无缓冲信道inputStream上的通信不会成功。在readValuesFromFile返回之前,inputStream上没有接收,因此在readValuesFromFile返回之后,goroutine的发送才会成功。

当函数readValuesFromFile返回时,defer语句将关闭扫描仪使用的文件。扫描仪可能会在文件关闭之前缓冲一些数据,但也可能不会读取任何数据。

通过关闭goroutine中的文件进行修复。

扫描仪返回的错误描述了问题。始终处理错误。

func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
inputStream := make(chan string)
go func() {
defer file.Close()
defer close(inputStream)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
if scanner.Err() != nil {
// Handle error as appropriate for your application.
log.Print("scan error", err)
}
}()
return inputStream
}

最新更新