如何在golang中在流上发送数据时使工作并发



我有一个golanggrpc服务器,它有流式端点。早些时候,我按顺序完成所有工作并在流上发送,但后来我意识到我可以使工作并发,然后在流上进行发送。从grpc-go文档:我知道我可以使工作并发,但你不能使流上的发送并发,所以我得到了下面的代码来完成这项工作。

以下是我在流式传输端点中的代码,它以流式传输的方式将数据发送回客户端。这同时完成所有工作。

// get "allCids" from lot of files and load in memory.
allCids := .....
var data = allCids.([]int64)
out := make(chan *custPbV1.CustomerResponse, len(data))
wg := &sync.WaitGroup{}
wg.Add(len(data))
go func() {
wg.Wait()
close(out)
}()
for _, cid := range data {
go func (id int64) {
defer wg.Done()
pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
if !pd.IsCorrect {
return
}
resources := us.helperCom.GenerateResourceString(pd)
val, err := us.GenerateInfo(clientId, resources, cfg)
if err != nil {
return
}
out <- val
}(cid)
}
for val := range out {
if err := stream.Send(val); err != nil {
log.Printf("send error %v", err)
}
}

现在我的问题是data切片的大小可能大约是一百万,所以我不想产生一百万的常规工作。我该如何处理这种情况?如果我使用100而不是len(data),那么这对我有用吗?或者我也需要在100子阵列中切片数据?我只是困惑于处理这个问题的最佳方法是什么?

我最近开始使用golang,所以如果在并发时上面的代码中有任何错误,请原谅。

请检查此伪代码

func main() {
works := make(chan int, 100)
errChan := make(chan error, 100)
out := make(chan *custPbV1.CustomerResponse, 100)
// spawn fixed workers
var workerWg sync.WaitGroup
for i := 0; i < 100; i++ {
workerWg.Add(1)
go worker(&workerWg, works, errChan, out)
}
// give input
go func() {
for _, cid := range data {
// this will be blocked if all the workers are busy and no space is left in the channel.
works <- cid
}
close(works)
}()
var analyzeResults sync.WaitGroup
analyzeResults.Add(2)
// process errors
go func() {
for err := range errChan {
log.Printf("error %v", err)
}
analyzeResults.Done()
}()
// process outout
go func() {
for val := range out {
if err := stream.Send(val); err != nil {
log.Printf("send error %v", err)
}
}
analyzeResults.Done()
}()
workerWg.Wait()
close(out)
close(errChan)
analyzeResults.Wait()
}
func worker(job *sync.WaitGroup, works chan int, errChan chan error, out chan *custPbV1.CustomerResponse) {
defer job.Done()
// Idle worker takes the work from this channel. 
for cid := range works {
pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
if !pd.IsCorrect {
errChan <- errors.New(fmt.Sprintf("pd %d is incorrect", pd))
// we can not return here as the total number of workers will be reduced. If all the workers does this then there is a chance that no workers are there to do the job
continue
}
resources := us.helperCom.GenerateResourceString(pd)
val, err := us.GenerateInfo(clientId, resources, cfg)
if err != nil {
errChan <- errors.New(fmt.Sprintf("got error", err))
continue
}
out <- val
}
}

解释:

这是一个工作池实现,我们在其中生成固定数量的goroutine(此处为100个工作程序(来执行相同的工作(GetCustomerData((&GenerateInfo(((,但具有不同的输入数据(此处为cid(。这里的100个工人并不意味着它是并行的,而是并发的(取决于GOMAXPROCS(。如果一个工作程序正在等待io结果(基本上是一些阻塞操作(,那么特定的goroutine将被上下文切换,其他工作程序goroutine有机会执行。但是,增加goroutuines(工人(可能不会带来太多性能,但可能会导致通道上的争用,因为更多的工人正在该通道上等待输入作业。

将100万个数据拆分为子切片的好处在于。假设我们有1000个工作岗位和100名工人。每个工人将被分配到1-10、11-20等工作。如果前10个工作比其他工作花费更多的时间怎么办。在这种情况下,第一个工作人员过载,其他工作人员将完成任务,并且即使存在挂起的任务也将处于空闲状态。因此,为了避免这种情况,这是最好的解决方案,因为空闲的工人将承担下一份工作。因此,与的其他工人相比,没有哪个工人更超载

最新更新