使用通道同步多个 goroutine



我需要启动多个具有单个任务队列和单个结果队列的工作线程。每个工作线程应该在不同的 goroutine 中启动。我需要等到所有工作人员都完成并且任务队列将为空,然后再退出程序。 我为goroutine同步准备了小例子。 主要思想是,我们在队列中计算任务并等待所有工人完成工作。但当前的实现有时会错过值。 为什么会发生这种情况以及如何解决问题? 示例代码:

import (
"fmt"
"os"
"os/signal"
"strconv"
)
const num_workers = 5
type workerChannel chan uint64
// Make channel for tasks
var workCh workerChannel
// Make channel for task counter
var cntChannel chan int
// Task counter
var tskCnt int64
// Worker function
func InitWorker(input workerChannel, result chan string, num int) {
for {
select {
case inp := <-input:
getTask()
result <- ("Worker " + strconv.Itoa(num) + ":" + strconv.FormatUint(inp, 10))
}
}
}
// Function to manage task counter
// should be in uniq goroutine
func taskCounter(inp chan int) {
for {
val := <-inp
tskCnt += int64(val)
}
}
// Put pask to the queue
func putTask(val uint64) {
func() {
fmt.Println("Put ", val)
cntChannel <- int(1)
workCh <- val
}()
}
// Get task from queue
func getTask() {
func() {
cntChannel <- int(-1)
}()
}
func main() {
// Init service channels
abort := make(chan os.Signal)
done := make(chan bool)
// init queue for results
result := make(chan string)
// init task queue
workCh = make(workerChannel)
// start some workers
for i := uint(0); i < num_workers; i++ {
go InitWorker(workCh, result, int(i))
}
// init counter for synchro
cntChannel = make(chan int)
go taskCounter(cntChannel)
// goroutine that put some tasks into queue
go func() {
for i := uint(0); i < 21; i++ {
putTask(uint64(i))
}
// wait for processing all tasks and close application
for len(cntChannel) != 0 {}
for tskCnt != 0 {}
for len(workCh) != 0 {}
for len(result) != 0 {}
// send signal for close
done <- true
}()
signal.Notify(abort, os.Interrupt)
for {
select {
case <-abort:
fmt.Println("Aborted.")
os.Exit(0)
// print results
case res := <-result:
fmt.Println(res)
case <-done:
fmt.Println("Done")
os.Exit(0)
}
}
}

使用同步。等待组等待 goroutines 完成。 关闭通道以使通道上的循环读数退出。

package main
import (
"fmt"
"sync"
)
type workerChannel chan uint64
const num_workers = 5
func main() {
results := make(chan string)
workCh := make(workerChannel)
// Start workers
var wg sync.WaitGroup
wg.Add(num_workers)
for i := 0; i < num_workers; i++ {
go func(num int) {
defer wg.Done()
// Loop processing work until workCh is closed
for w := range workCh {
results <- fmt.Sprintf("worker %d, task %d", num, w)
}
}(i)
}
// Close result channel when workers are done
go func() {
wg.Wait()
close(results)
}()
// Send work to be done
go func() {
for i := 0; i < 21; i++ {
workCh <- uint64(i)
}
// Closing the channel causes workers to break out of loop
close(workCh)
}()
// Process results. Loop exits when result channel is closed.
for r := range results {
fmt.Println(r)
}
}

https://play.golang.org/p/ZifpzsP6fNv

我建议使用close(chan(来完成此类任务。

等待组版本。

package main
import (
"log"
"sync"
)
func worker(in chan int, wg *sync.WaitGroup) {
defer wg.Done()
for i := range in {
log.Println(i)
}
}
func main() {
in := make(chan int)
lc := 25
maxValue := 30
wg := sync.WaitGroup{}
wg.Add(lc)
for i := 0; i < lc; i++ {
go worker(in, &wg)
}
for c := 0; c <= maxValue; c++ {
in <- c
}
close(in)
wg.Wait()
}

频道版本

package main
import (
"log"
"os"
)
func worker(in chan int, end chan struct{}) {
defer func() { end <- struct{}{} }()
for i := range in {
log.Println(i)
}
}
func main() {
in := make(chan int)
lc := 25
maxValue := 30
end := make(chan struct{})
var fin int
go func() {
for {
<-end
fin++
log.Println(`fin`, fin)
if fin == lc {
break
}
}
close(end)
os.Exit(0)
}()
for i := 0; i < lc; i++ {
go worker(in, end)
}
for c := 0; c <= maxValue; c++ {
in <- c
}
close(in)
<-make(chan struct{})
}

最新更新