Goroutine实现怀疑



我需要一些帮助来理解如何在这个问题中使用goroutines。我只会发布一些代码片段,但如果你想深入了解,你可以在这里查看

基本上,我有一个分发器函数,它接收多次调用的请求切片,每次调用该函数时,它都必须将此请求分发到其他函数中以实际解决请求。我正在尝试创建一个通道并启动此功能以解决新 goroutine 上的请求,以便程序可以并发处理请求。

如何调用分配函数:

// Run trigger the system to start receiving requests
func Run() {
    // Since the programs starts here, let's make a channel to receive requests
    requestCh := make(chan []string)
    idCh := make(chan string)
    // If you want to play with us you need to register your Sender here
    go publisher.Sender(requestCh)
    go makeID(idCh)
    // Our request pool
    for request := range requestCh {
        // add ID
        request = append(request, <-idCh)
        // distribute
        distributor(request)
    }
    // PROBLEM
    for result := range resultCh {
        fmt.Println(result)
    }
}

分配函数本身:

// Distribute requests to respective channels.
// No waiting in line. Everybody gets its own goroutine!
func distributor(request []string) {
    switch request[0] {
    case "sum":
        arithCh := make(chan []string)
        go arithmetic.Exec(arithCh, resultCh)
        arithCh <- request
    case "sub":
        arithCh := make(chan []string)
        go arithmetic.Exec(arithCh, resultCh)
        arithCh <- request
    case "mult":
        arithCh := make(chan []string)
        go arithmetic.Exec(arithCh, resultCh)
        arithCh <- request
    case "div":
        arithCh := make(chan []string)
        go arithmetic.Exec(arithCh, resultCh)
        arithCh <- request
    case "fibonacci":
        fibCh := make(chan []string)
        go fibonacci.Exec(fibCh, resultCh)
        fibCh <- request
    case "reverse":
        revCh := make(chan []string)
        go reverse.Exec(revCh, resultCh)
        revCh <- request
    case "encode":
        encCh := make(chan []string)
        go encode.Exec(encCh, resultCh)
        encCh <- request
    }
}

还有斐波那契。Exec 函数来说明我如何在给定在 fibCh 上收到的请求并通过 resultCh 发送结果值的情况下尝试计算斐波那契。

func Exec(fibCh chan []string, result chan map[string]string) {
    fib := parse(<-fibCh)
    nthFibonacci(fib)
    result <- fib
}

到目前为止,在 Run 函数中,当我在结果 Ch 上测量时,我得到的结果,但也会出现死锁。但是为什么?另外,我想我应该使用 waitGroup 函数来等待 goroutines 完成,但我不确定如何实现它,因为我期望收到连续的请求流。我将不胜感激一些帮助,以了解我在这里做错了什么以及解决它的方法。

我不是在深入研究您的应用程序的实现细节,但基本上在我看来,您可以使用workers模式。

使用workers模式,多个 goroutines 可以从单个通道读取,从而在 CPU 内核之间分配大量工作,因此称为 worker 。在 Go 中,这种模式很容易实现 - 只需启动一些以通道作为参数的 goroutine,然后将值发送到该通道 - 分发和多路复用将由 Go 运行时自动完成。

下面是工作线程模式的简单实现:

package main
import (
    "fmt"
    "sync"
    "time"
)
func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasksCh
        if !ok {
            return
        }
        d := time.Duration(task) * time.Millisecond
        time.Sleep(d)
        fmt.Println("processing task", task)
    }
}
func pool(wg *sync.WaitGroup, workers, tasks int) {
    tasksCh := make(chan int)
    for i := 0; i < workers; i++ {
        go worker(tasksCh, wg)
    }
    for i := 0; i < tasks; i++ {
        tasksCh <- i
    }
    close(tasksCh)
}
func main() {
    var wg sync.WaitGroup
    wg.Add(36)
    go pool(&wg, 36, 50)
    wg.Wait()
}

另一个有用的资源是这篇不错的文章,如何使用WaitGroup等待所有 goroutines 完成执行(因此不会陷入死锁):

http://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/

以及它的非常基本的实现:

去游乐场

如果您不想将实现更改为使用 worker 模式,最好使用另一个通道来表示 goroutine 执行的结束,因为当没有接收方接受通过无缓冲通道发送的消息时,就会发生死锁。

done := make(chan bool)
//.....
done <- true //Tell the main function everything is done.

因此,当您收到消息时,您可以通过将通道值设置为 true 来将执行标记为已完成。

最新更新