我有一个连续运行的 Go 程序,完全依赖于 goroutines + 1 manager
线程。主线程只是调用 goroutines 并以其他方式休眠。
存在内存泄漏。该程序使用越来越多的内存,直到耗尽所有 16GB RAM + 32GB SWAP,然后每个 goroutine 都死机。实际上是操作系统内存导致死机,通常当我尝试执行anotherapp
时,死机fork/exec ./anotherapp: cannot allocate memory
。
发生这种情况时,所有工作线程都将死机并恢复并重新启动。所以每个goroutine都会恐慌,被恢复并重新启动......此时内存使用量不会减少,即使现在几乎没有分配,它仍保持在 48GB。这意味着所有 goroutines 总是会因为内存不足而感到恐慌,直到整个可执行文件被杀死并完全重新启动。
整个事情大约有50,000行,但实际问题区域如下:
type queue struct {
identifier string
type bool
}
func main() {
// Set number of gorountines that can be run
var xthreads int32 = 10
var usedthreads int32
runtime.GOMAXPROCS(14)
ready := make(chan *queue, 5)
// Start the manager goroutine, which prepared identifiers in the background ready for processing, always with 5 waiting to go
go manager(ready)
// Start creating goroutines to process as they are ready
for obj := range ready { // loops through "ready" channel and waits when there is nothing
// This section uses atomic instead of a blocking channel in an earlier attempt to stop the memory leak, but it didn't work
for atomic.LoadInt32(&usedthreads) >= xthreads {
time.Sleep(time.Second)
}
debug.FreeOSMemory() // Try to clean up the memory, also did not stop the leak
atomic.AddInt32(&usedthreads, 1) // Mark goroutine as started
// Unleak obj, probably unnecessary, but just to be safe
copy := new(queue)
copy.identifier = unleak.String(obj.identifier) // unleak is a 3rd party package that makes a copy of the string
copy.type = obj.type
go runit(copy, &usedthreads) // Start the processing thread
}
fmt.Println(`END`) // This should never happen as the channels are never closed
}
func manager(ready chan *queue) {
// This thread communicates with another server and fills the "ready" channel
}
// This is the goroutine
func runit(obj *queue, threadcount *int32) {
defer func() {
if r := recover(); r != nil {
// Panicked
erstring := fmt.Sprint(r)
reportFatal(obj.identifier, erstring)
} else {
// Completed successfully
reportDone(obj.identifier)
}
atomic.AddInt32(threadcount, -1) // Mark goroutine as finished
}()
do(obj) // This function does the actual processing
}
据我所知,当do
函数(最后一行)结束时,无论是完成还是惊慌失措,runit
函数都会结束,这将完全结束 goroutine,这意味着该 goroutine 的所有内存现在都应该是空闲的。这就是现在发生的事情。发生的情况是,这个应用程序只是使用越来越多的内存,直到它变得无法运行,所有runit
goroutine都惊慌失措,但内存并没有减少。
分析不会发现任何可疑内容。泄漏似乎超出了探查器的范围。
请考虑反转模式,请参阅此处或下文....
package main
import (
"log"
"math/rand"
"sync"
"time"
)
// I do work
func worker(id int, work chan int) {
for i := range work {
// Work simulation
log.Printf("Worker %d, sleeping for %d secondsn", id, i)
time.Sleep(time.Duration(rand.Intn(i)) * time.Second)
}
}
// Return some fake work
func getWork() int {
return rand.Intn(2) + 1
}
func main() {
wg := new(sync.WaitGroup)
work := make(chan int)
// run 10 workers
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
worker(i, work)
wg.Done()
}(i)
}
// main "thread"
for i := 0; i < 100; i++ {
work <- getWork()
}
// signal there is no more work to be done
close(work)
// Wait for the workers to exit
wg.Wait()
}