使用多个生产者(使用 goroutine)实现信号量



这是我存在的祸根。

type ec2Params struct {
    sess *session.Session
    region string
}
type cloudwatchParams struct {
    cl cloudwatch.CloudWatch
    id string
    metric string
    region string
}
type request struct {
    ec2Params
    cloudwatchParams
}
// Control concurrency and sync
var maxRoutines = 128
var sem chan bool
var req chan request
func main() {
    sem := make(chan bool, maxRoutines)
    for i := 0; i < maxRoutines; i++ {
        sem <- true
    }
    req := make(chan request)
    go func() { // This is my the producer
        for _, arn := range arns {
            arnCreds := startSession(arn)
            for _, region := range regions {
                sess, err := session.NewSession(
                    &aws.Config{****})
                if err != nil {
                    failOnError(err, "Can't assume role")
                }
                req <- request{ec2Params: ec2Params{ **** }}
            }
        }
    }() 
    for f := range(req) {
        <- sem
        if (ec2Params{}) != f.ec2Params {
            go getEC2Metrics(****)
        } else {
            // I should be excercising this line of code too, 
            // but I'm not :(
            go getMetricFromCloudwatch(****) 
        }   
        sem <- true
    }
}

getEC2MetricsgetCloudwatchMetrics 是要执行的 goroutines

func getMetricFromCloudwatch(cl cloudwatch.CloudWatch, id, metric, region string) {
    // Magic
}
func getEC2Metrics(sess *session.Session, region string) {
    ec := ec2.New(sess)
    var ids []string
    l, err := ec.DescribeInstances(&ec2.DescribeInstancesInput{})
    if err != nil {
        fmt.Println(err.Error())
    } else {
        for _, rsv := range l.Reservations {
            for _, inst := range rsv.Instances {
                ids = append(ids, *inst.InstanceId)
            }
        }
        metrics := cfg.AWSMetric.Metric
        if len(ids) >= 0 {
            cl := cloudwatch.New(sess)
            for _, id := range ids{
                for _, metric := range metrics {
                    // For what I can tell, execution get stuck here
                    req <- request{ cloudwatchParams: ***** }}
                }
            }
        }
    }
}

maingetEC2Metrics中的匿名制作者都应该以异步方式发布数据req,但到目前为止,似乎getEC2Metrics发布到频道的任何内容从未被处理过。看起来有什么东西阻止我从goroutine中发布,但我什么也没找到。我很想知道如何做到这一点并产生不成比例的行为(这是一个实际工作的信号量(。

实现的基础可以在这里找到:https://burke.libbey.me/conserving-file-descriptors-in-go/

我很疯狂,JimB 的评论让车轮旋转,现在我已经解决了这个问题!

// Control concurrency and sync
var maxRoutines = 128
var sem chan bool
var req chan request // Not reachable inside getEC2Metrics
func getEC2Metrics(sess *session.Session, region string, req chan <- request ) {
    ....
    ....
            for _, id := range ids{
                for _, metric := range metrics {
                    req <- request{ **** }} // When using the global req, 
                                            // this would block
                }
            }
    ....
    ....
}
func main() {
    sem := make(chan bool, maxRoutines)
    for i := 0; i < maxRoutines; i++ {
        sem <- true    
    }
    req := make(chan request)
    go func() {
        // Producing tasks
    }()
    for f := range(req) {
        <- sem // checking out tickets outside the goroutine does block 
               //outside of the goroutine
        go func() {
            defer func() { sem <- true }()
            if (ec2Params{}) != f.ec2Params {
                getEC2Metrics(****, req) // somehow sending the channel makes
                                         // possible to publish to it
            } else {
                getMetricFromCloudwatch(****)
            }
        }()
    }
}

有两个问题:

  1. 信号量没有锁定(我认为这是因为我在 goroutine 中签出和标记,所以可能存在竞争条件(。
  2. 出于某种原因,getEC2Metrics 没有正确解决全局通道 req,因此在尝试发布到显然在范围内的通道时,它会让所有 goroutines 卡住,但事实并非如此(我真的不知道为什么(。

老实说,我只是对第二项很幸运,到目前为止,我还没有找到任何关于这个怪癖的文档,但最后我很高兴它正在工作。

最新更新