如何使用Go丢弃并发筛选管道中的值



我想知道并理解如何使用go执行过滤并发管道在生产者/消费者方案中。

我已经设法编写了一个版本,它检查一个值,如果它正常,则将其发送到一个通道如果没有,则将该值发送到另一个通道。

读取并处理完are值后,由两个goroutine负责读取处理后的值并将它们写入文件。这个版本还可以。但是。。。

  1. 假设我不想要无效的值。有没有办法更改select语句(或使用者goroutine(,以便输出正确的值(即仅使用一个输出通道(。我尝试删除invalidValues通道,但是我没有成功。

  2. 我尝试将select语句放入if valid?;其中一个分支包含与此版本相同的完整语句,另一个分支为false只需等待已完成的频道。通过这种方式,我可以丢弃无效值并使用一个通道,但这种方法并没有成功。

关于如何解决这个问题有什么想法吗?

  1. 此外,在这个方案中,我想知道为什么如果我省略从invalidValues通道中删除值的goroutine程序没有完成?通道是否需要清空,否则仍会堵塞?有没有更优雅的方法可以做到这一点价值观

谢谢!!

//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}

这是代码的完整版本

done := make(chan struct{})
defer close(done)
inputStream := make(chan string)
outputStream := make(chan string)
invalidValues := make(chan string)
//Producer reads a file with values and stores them in a channel
go func() {
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
close(inputStream)
}()
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(outputStream)
close(invalidValues)
}()
//Write outputStream file
resultFile, err := os.Create("outputStream.txt")
if err != nil {
log.Fatal(err)
}
//Error file
errorFile, err := os.Create("errors.txt")
if err != nil {
log.Fatal(err)
}
//Create two goruotines for writing the outputStream file
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
//Write outputStream and error to files
for r := range outputStream {
_, err := resultFile.WriteString(r + "n")
if err != nil {
log.Fatal(err)
}
}
resultFile.Close()
wg2.Done()
}()
go func() {
for r := range invalidValues {
_, err := errorFile.WriteString(r + "n")
if err != nil {
log.Fatal(err)
}
}
errorFile.Close()
wg2.Done()
}()
wg2.Wait()

删除无效通道:

for value := range inputStream {
var c *chan string
if valid := checkValue(value); valid {
select {
case outputStream <- value
case <-done:
return
}
}
}

如果删除无效值读取器goroutine,则必须将waitgroup更改为:

wg2.Add(1)

这样你就不会无限期地等待。

最新更新