我正在通过函数组合构建一个具有通道的过滤管道,而不是在一个方法中使用120+LOC因为我以后可能会重用这个管道的某个部分。
我没能使它按预期工作。我怀疑函数readValuesFromFile
在scanner.Scan()
将值放入inputStream
通道之前退出(即该方法的主goroutine在(1(goroutine之前退出(。
如果我只在通道中放入一些随机字符串来替换scanner.Scan()
整个管道按预期运行。
这是问题所在还是我遗漏了什么?
如何以优雅的方式解决这个问题?
谢谢!
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
inputStream := make(chan string)
go func() { //(1)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() { // (2)
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
close(inputStream)
}()
return inputStream
}
func validateValues(inputStream <-chan string) <-chan string {
//read from the input stream + validate&filter + creating and putting values in an output stream
}
func writeResults(validStream <-chan string) {
//read from the validated stream and write data to file
}
func main() {
valueStream := readValuesFromFile("myfile.txt")
validatedStream := validateValues(valueStream)
writeResults(validatedStream)
}
函数readValuesFromFile
保证在第一个值发送到inputStream
之前返回。在发送器和接收器准备好之前,在无缓冲信道inputStream
上的通信不会成功。在readValuesFromFile
返回之前,inputStream
上没有接收,因此在readValuesFromFile
返回之后,goroutine的发送才会成功。
当函数readValuesFromFile
返回时,defer语句将关闭扫描仪使用的文件。扫描仪可能会在文件关闭之前缓冲一些数据,但也可能不会读取任何数据。
通过关闭goroutine中的文件进行修复。
扫描仪返回的错误描述了问题。始终处理错误。
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
inputStream := make(chan string)
go func() {
defer file.Close()
defer close(inputStream)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
if scanner.Err() != nil {
// Handle error as appropriate for your application.
log.Print("scan error", err)
}
}()
return inputStream
}