我正在尝试处理从AWS S3读取的CSV文件,对于每一行文本,我都想激活worker
函数来做一些工作并返回结果
理想情况下,我希望结果按原始CSV排序,但这不是一个要求,出于某种原因,当我运行此代码时,我会遇到奇怪的数据竞赛和这行:
for result := range output {
results = append(results, result)
}
永远阻止
我尝试使用一个WaitGroup,但它也不起作用,关闭output
频道也会导致我出现错误";试图把一些东西放在一个封闭的通道里";
func main() {
resp, err := ReadCSV(bucket, key)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := csv.NewReader(resp.Body)
detector := NewDetector(languages)
var results []DetectionResult
numWorkers := 4
input := make(chan string, numWorkers)
output := make(chan DetectionResult, numWorkers)
start := time.Now()
for w := 1; w < numWorkers+1; w++ {
go worker(w, detector, input, output)
}
go func() {
for {
record, err := reader.Read()
if err == io.EOF {
close(input)
break
}
if err != nil {
log.Fatal(err)
}
text := record[0]
input <- text
}
}()
for result := range output {
results = append(results, result)
}
elapsed := time.Since(start)
log.Printf("Decoded %d lines of text in %s", len(results), elapsed)
}
func worker(id int, detector lingua.LanguageDetector, input chan string, output chan DetectionResult) {
log.Printf("worker %d startedn", id)
for t := range input {
result := DetectText(detector, t)
output <- result
}
log.Printf("worker %d finishedn", id)
}
尝试处理CSV(最好按顺序(,并使用对worker
的函数调用结果来丰富它
尝试设置WaitGroup,尝试在完成读取时关闭输出通道(EOF(-导致错误
for循环将读取直到output
通道关闭。处理完所有输入后(而不是读取完输入后(,必须关闭output
通道。
您可以为此使用等待组:
func worker(detector lingua.LanguageDetector, wg *sync.WaitGroup) func(id int, input chan string, output chan DetectionResult) {
wg.Add(1)
return func(id int, input chan string, output chan DetectionResult) {
defer wg.Done() // Notify wg when processing is finished
log.Printf("worker %d startedn", id)
for t := range input {
result := DetectText(detector, t)
output <- result
}
log.Printf("worker %d finishedn", id)
}
}
然后:
go func() {
wg.Wait()
close(output)
}()
for result := range output {
results = append(results, result)
}
我发现你错过了一种向工人发出信号的方法,即没有更多的工作,他们应该停止工作。你还需要一种方式让工人们发出信号,表明他们确实完成了任务。当所有这些信号都被发送和接收时,main应该控制所有工人的累积结果。
在所有CSV记录迭代后,我们可以通过关闭输入来向工人发出信号,并且所有作业都已通过输入发送:
nWorkers := 4
input := make(chan Tx, nWorkers*2) // buffer so input (the "jobs queue") is always full; see rationale at bottom of answer
output := make(chan Ty)
done := make(chan bool)
for i := 1; i < nWorkers+1; i++ {
go worker(input, output, done)
}
go func() {
for {
record, _ := reader.Read()
input <- record[0]
}
close(input)
}()
在没有更多作业时,在输入时发送作业的goroutine可以安全地关闭输入。工人仍然可以接收任何仍在输入中的工作,即使在它关闭后也是如此。
当输入被关闭并且最终为空时,工作者的范围循环退出。然后,一名工作人员通过在已完成的通道上发送信号返回:
func worker(input <-chan Tx, output chan<- Ty, done <-chan bool) {
for x := range input { // loop until input is closed
output <- doWork(x)
}
done <- true // finally send done
}
当我们收到nWorker完成消息的数量时,我们知道所有工作都已完成,并且工作人员不会在输出时发送,因此关闭输出是安全的:
go func() {
log.Println("counting done workers")
var doneCtr int
for {
select {
case <-done:
log.Println("got done")
doneCtr++
}
if doneCtr == nWorkers {
close(output) // signal the results appender to stop
log.Println("closed output")
}
}
}()
关闭输出是向主设备发出的信号,它可以停止尝试接收并累积结果:
results := make([]result, 0)
for result := range output {
results = append(results, result)
}
最后:所有其他goroutine都已终止,main可以继续累积结果。
至于以原始订单获得结果,这只是将原始订单与每个作业一起发送,将该订单与结果一起发送回,然后根据订单进行排序:
type row struct {
num int
text string
}
type result struct {
lang language
row row
}
...
input <- row{rowNum, record[0]}
rowNum++
...
output <- result{detect(row.text), row}
...
results = append(results, result)
...
sort.Slice(results, func(i, j int) bool { return results[i].row.num < results[j].row.num})
我在The Go Playground制作了一个完整的工作模型。
我的理由可能是缓冲,但在我看来,唯一真正令人失望的是发现工人在等待输入时停滞不前。将输入缓冲2倍的工人数量可以确保每个工人在任何时刻平均有两个工作在等待。