我正在构建一个参数优化器,它本质上生成配置,对所有这些进行基准测试,收集所有结果,对它们进行排序,然后根据基准测试结果选择性能最佳的配置。
基准测试本身运行良好,但每次运行需要50毫秒到2秒,具体取决于配置。关键是,优化器生成了大量的配置,也就是说,最低端的配置在10万到高端的配置在4000万之间,大约500万是一个很好的正常范围。显然,单线程版本需要很长时间,而且由于任务相对较轻,CPU负载实际上非常低。
我已经设计了基准测试,使其能够很好地处理并发性,也就是说,运行程序被封装在一个单独的结构(称为代理)中,基准测试本质上是一个将所有状态作为参数的纯函数。本质上,每次运行都会创建自己的状态,然后独立于其他所有函数运行,但所有函数都使用相同的(引用的)共享数据集。功能如下所示。
然而,我很难处理每个基准的回报值。以前,在Scale中,我们使用Async/Await进行任务并行,并让结果继续下去。Go Routines,afaik只适用于没有返回值的函数。在实践中,通道是从goroutine中获取值的最自然的方式。这就是我正在思考的关键:
考虑到我通常有>100万个任务,如何正确有效地获取返回值?
与此相关的是,Golang是否真的有一个非常快速的参数优化器?对于python,我记得optuna提供了出色的结果。
谢谢
func (a *Agent) runOptimization(strategyConfigs []cmdb.Config) (result *bmx.OptimizeResult) {
scores := make([]bmx.BackTestResult, len(strategyConfigs))
println("Run all benchmarks")
for i, config := range strategyConfigs {
state := newState(&config)
score := a.runBenchmark(state)
scores[i] = *score // sort only works on actual values
}
println("Sort results")
sort.Sort(bmx.ByTotalPnL(scores))
println("Select best config")
best := scores[len(scores)-1]
println("Generate best strategy config")
stratConf := a.getStrategyConfig(best.PatternConfig)
println("Return optimization results ")
result = &bmx.OptimizeResult{
Symbol: best.Symbol,
StrategyType: best.StrategyType,
OptimizedConfig: &stratConf,
...
}
return result
}
有多种方法可以做到这一点。
A";教科书;一种是:
results := make(chan *score)
for i, config := range strategyConfigs {
state := newState(&config)
go a.runBenchmark(state, results)
}
for i := 0; i < len(strategyConfigs); i++ {
scores[i] = <-results
}
…,然后修改runBenchmark
方法,使其不返回任何值并且接受类型为CCD_ 2的第二自变量。
片段滚动如下:
- 创建一个通道来交换类型为
*score
的值 - 启动尽可能多的goroutine,运行CCD_;代理人">
该方法通过提交给它的通道发送(指向)它计算的score
对象的指针,然后退出 - 另一个循环从通道执行与派生的goroutine一样多的接收,并将每个接收到的值放入结果切片中
注意事项:
这预先假定
a
可以在多个goroutine同时运行的情况下执行其runBenchmark
。如果不可以,您将需要创建一个单独的
a
来运行每个单独的goroutine
鉴于你的例子并不太简单,我很难有根据地猜测它有多难/有多简单。
如果你需要帮助,请单独问一个狭隘的问题。如果你将拥有数亿美元;策略配置";,这种方法过于简单,因为所有的goroutine都将同时生成,这a)是对资源的浪费;b) 如果数字太大,甚至可能失败
教科书上的修正是使用所谓的";扇出"——当你有一个单独的goroutine接收";任务";通过一个通道,并将它们分发到数量有限的工人goroutine上,这些工人goroutines始终保持在一定的限制之下。你可以从这里开始。
另一种方法是采用这样一个事实,即在Go中,数组的每个元素(以及切片——扩展)都被视为一个单独的变量。这意味着,可以同时更新工作程序goroutines中生成的切片的各个元素——只要该切片是预先分配的,并且在该过程进行时从不重新分配(使用append
、resliced等进行操作)。
为了演示,让我们使用";等待组":
import "sync"
var wg sync.WaitGroup
scores := make([]*score, len(strategyConfigs))
wg.Add(len(strategyConfigs))
for i, config := range strategyConfigs {
state := newState(&config)
go a.runBenchmark(state, &scores[i], &wg)
}
wg.Wait()
runBenchmark
方法应修改为具有
defer wg.Done()
作为其第一个语句,并接受两个附加参数--类型为CCD_ 11和CCD_。
这里,runBenchmark
开始在一个单独的goroutine中运行,并被传递一个要用其结果更新的元素的地址和一个等待组的地址,以发出信号"0";任务完成";上。
请注意,与第一种情况基本相同的注意事项也适用。
正如您所看到的,goroutine实际上不会返回任何值。这主要是因为当它可能的时候,产生它的goroutine可能已经不复存在了,而且没有地方可以将这个值返回。
因此,基本上有两种方法来";获取数据";goroutine的:
在通道上发送该值(并让其他goroutine从该通道接收)。
这是Go的面包和黄油。我建议你从这种方法开始,并使用它,直到你觉得完全适应它
更新内存中的某个位置,前提是没有其他goroutine做同样的事情(否则您将面临数据竞赛)。
在某些情况下,这种方法可能更快(对一些人来说甚至更简单),但这种代码背后的推理可能更难理解。
您可以从这个开始了解基本概念。
总之,有几个指针。
我建议在掌握其基础知识之前,不要开始编写涉及并发的甚至不太严重的代码。
请从围棋之旅的相关部分开始,然后转到围棋博客:
- https://blog.golang.org/pipelines
- https://blog.golang.org/io2013-talk-concurrency
- https://blog.golang.org/concurrency-timeouts
先尝试简单的例子。