Golang:如何捕获大规模并行基准(> 100 万个任务)的返回值?



我正在构建一个参数优化器,它本质上生成配置,对所有这些进行基准测试,收集所有结果,对它们进行排序,然后根据基准测试结果选择性能最佳的配置。

基准测试本身运行良好,但每次运行需要50毫秒到2秒,具体取决于配置。关键是,优化器生成了大量的配置,也就是说,最低端的配置在10万到高端的配置在4000万之间,大约500万是一个很好的正常范围。显然,单线程版本需要很长时间,而且由于任务相对较轻,CPU负载实际上非常低。

我已经设计了基准测试,使其能够很好地处理并发性,也就是说,运行程序被封装在一个单独的结构(称为代理)中,基准测试本质上是一个将所有状态作为参数的纯函数。本质上,每次运行都会创建自己的状态,然后独立于其他所有函数运行,但所有函数都使用相同的(引用的)共享数据集。功能如下所示。

然而,我很难处理每个基准的回报值。以前,在Scale中,我们使用Async/Await进行任务并行,并让结果继续下去。Go Routines,afaik只适用于没有返回值的函数。在实践中,通道是从goroutine中获取值的最自然的方式。这就是我正在思考的关键:

考虑到我通常有>100万个任务,如何正确有效地获取返回值?

与此相关的是,Golang是否真的有一个非常快速的参数优化器?对于python,我记得optuna提供了出色的结果。

谢谢

func (a *Agent) runOptimization(strategyConfigs []cmdb.Config) (result *bmx.OptimizeResult) {
scores := make([]bmx.BackTestResult, len(strategyConfigs))
println("Run all benchmarks")
for i, config := range strategyConfigs {
state := newState(&config)
score := a.runBenchmark(state)
scores[i] = *score // sort only works on actual values
}

println("Sort results")
sort.Sort(bmx.ByTotalPnL(scores))
println("Select best config")
best := scores[len(scores)-1]
println("Generate best strategy config")
stratConf := a.getStrategyConfig(best.PatternConfig)
println("Return optimization results ")
result = &bmx.OptimizeResult{
Symbol:          best.Symbol,
StrategyType:    best.StrategyType,
OptimizedConfig: &stratConf,
...
}
return result
}

有多种方法可以做到这一点。

A";教科书;一种是:

results := make(chan *score)
for i, config := range strategyConfigs {
state := newState(&config)
go a.runBenchmark(state, results)
}
for i := 0; i < len(strategyConfigs); i++ {
scores[i] = <-results
}

…,然后修改runBenchmark方法,使其不返回任何值并且接受类型为CCD_ 2的第二自变量。

片段滚动如下:

  1. 创建一个通道来交换类型为*score的值
  2. 启动尽可能多的goroutine,运行CCD_;代理人">
    该方法通过提交给它的通道发送(指向)它计算的score对象的指针,然后退出
  3. 另一个循环从通道执行与派生的goroutine一样多的接收,并将每个接收到的值放入结果切片中

注意事项:

  • 这预先假定a可以在多个goroutine同时运行的情况下执行其runBenchmark

    如果不可以,您将需要创建一个单独的a来运行每个单独的goroutine
    鉴于你的例子并不太简单,我很难有根据地猜测它有多难/有多简单。
    如果你需要帮助,请单独问一个狭隘的问题。

  • 如果你将拥有数亿美元;策略配置";,这种方法过于简单,因为所有的goroutine都将同时生成,这a)是对资源的浪费;b) 如果数字太大,甚至可能失败
    教科书上的修正是使用所谓的";扇出"——当你有一个单独的goroutine接收";任务";通过一个通道,并将它们分发到数量有限的工人goroutine上,这些工人goroutines始终保持在一定的限制之下。你可以从这里开始。

另一种方法是采用这样一个事实,即在Go中,数组的每个元素(以及切片——扩展)都被视为一个单独的变量。这意味着,可以同时更新工作程序goroutines中生成的切片的各个元素——只要该切片是预先分配的,并且在该过程进行时从不重新分配(使用append、resliced等进行操作)。

为了演示,让我们使用";等待组":

import "sync"
var wg sync.WaitGroup
scores := make([]*score, len(strategyConfigs))
wg.Add(len(strategyConfigs))
for i, config := range strategyConfigs {
state := newState(&config)
go a.runBenchmark(state, &scores[i], &wg)
}
wg.Wait()

runBenchmark方法应修改为具有

defer wg.Done()

作为其第一个语句,并接受两个附加参数--类型为CCD_ 11和CCD_。

这里,runBenchmark开始在一个单独的goroutine中运行,并被传递一个要用其结果更新的元素的地址和一个等待组的地址,以发出信号"0";任务完成";上。

请注意,与第一种情况基本相同的注意事项也适用。

正如您所看到的,goroutine实际上不会返回任何值。这主要是因为当它可能的时候,产生它的goroutine可能已经不复存在了,而且没有地方可以将这个值返回。

因此,基本上有两种方法来";获取数据";goroutine的:

  • 在通道上发送该值(并让其他goroutine从该通道接收)。

    这是Go的面包和黄油。我建议你从这种方法开始,并使用它,直到你觉得完全适应它

  • 更新内存中的某个位置,前提是没有其他goroutine做同样的事情(否则您将面临数据竞赛)。

    在某些情况下,这种方法可能更快(对一些人来说甚至更简单),但这种代码背后的推理可能更难理解。

您可以从这个开始了解基本概念。


总之,有几个指针。

  • 我建议在掌握其基础知识之前,不要开始编写涉及并发的甚至不太严重的代码。

    请从围棋之旅的相关部分开始,然后转到围棋博客:

    • https://blog.golang.org/pipelines
    • https://blog.golang.org/io2013-talk-concurrency
    • https://blog.golang.org/concurrency-timeouts
  • 先尝试简单的例子。

最新更新