加载带有和不使用 go-例程的地图

  • 本文关键字:go- 例程 地图 加载 go
  • 更新时间 :
  • 英文 :


这是我遇到的一个有趣的情况。我需要从文件中读取,并在使用 go-routine 进行一些数据操作后,根据我们发现的内容填充地图。以下是简化的问题陈述和示例:

生成运行gen_data.sh所需的数据

#!/bin/bash 
rm some.dat || : 
for i in `seq 1 10000`; do 
echo "$i `date` tx: $RANDOM rx:$RANDOM" >> some.dat
done

如果我将这些行some.dat读到没有使用loadtoDict.gogo-例程的map[int]string中,它会保持对齐。(如第 1 个和第 2 个单词相同,请参阅下面的 O/P。

在现实生活中,我确实需要在将行加载到地图之前处理这些行(昂贵(,使用 go-routines 可以加快我的字典创建速度,这是解决实际问题的重要要求。

loadtoDict.go

package main
import (
"bufio"
"fmt"
"log"
"os"
)
var (
fileName = "some.dat"
)
func checkerr(err error) {
if err != nil {
fmt.Println(err)
log.Fatal(err)
}
}
func main() {
ourDict := make(map[int]string)
f, err := os.Open(fileName)
checkerr(err)
defer f.Close()
fscanner := bufio.NewScanner(f)
indexPos := 1
for fscanner.Scan() {
text := fscanner.Text()
//fmt.Println("text", text)
ourDict[indexPos] = text
indexPos++
}
for i, v := range ourDict {
fmt.Printf("%d: %sn", i, v)
}
}

运行:

$ ./loadtoDict
...
8676: 8676 Mon Dec 23 15:52:24 PST 2019 tx: 17718 rx:1133
2234: 2234 Mon Dec 23 15:52:20 PST 2019 tx: 13170 rx:15962
3436: 3436 Mon Dec 23 15:52:21 PST 2019 tx: 17519 rx:5419
6177: 6177 Mon Dec 23 15:52:23 PST 2019 tx: 5731 rx:5449

请注意第一个和第 2 个单词是如何"对齐"的。但是,如果我使用 go-例程加载我的地图,就会出错:

async_loadtoDict.go

package main
import (
"bufio"
"fmt"
"log"
"os"
"sync"
)
var (
fileName = "some.dat"
mu       = &sync.RWMutex{}
MAX = 9000
)
func checkerr(err error) {
if err != nil {
fmt.Println(err)
log.Fatal(err)
}
}
func main() {
ourDict := make(map[int]string)
f, err := os.Open(fileName)
checkerr(err)
defer f.Close()
fscanner := bufio.NewScanner(f)
indexPos := 1
var wg sync.WaitGroup
sem := make(chan int, MAX)
defer close(sem)
for fscanner.Scan() {
text := fscanner.Text()
wg.Add(1)
sem <- 1
go func() {
mu.Lock()
defer mu.Unlock()
ourDict[indexPos] = text
indexPos++
<- sem
wg.Done()
}()
}
wg.Wait()
for i, v := range ourDict {
fmt.Printf("%d: %sn", i, v)
}
}

输出:

$ ./async_loadtoDict 
...
11: 22 Mon Dec 23 15:52:19 PST 2019 tx: 25688 rx:7602
5716: 6294 Mon Dec 23 15:52:23 PST 2019 tx: 28488 rx:3572
6133: 4303 Mon Dec 23 15:52:21 PST 2019 tx: 24286 rx:1565
7878: 9069 Mon Dec 23 15:52:25 PST 2019 tx: 16863 rx:24234
8398: 7308 Mon Dec 23 15:52:23 PST 2019 tx: 4321 rx:20642
9566: 3489 Mon Dec 23 15:52:21 PST 2019 tx: 14447 rx:12630
2085: 2372 Mon Dec 23 15:52:20 PST 2019 tx: 14375 rx:24151

尽管使用互斥锁保护引入ourDict[indexPos]。我希望我的地图索引与引入尝试保持一致。

谢谢!

您的信号量sem不起作用,因为您对其进行了深度缓冲。

通常,这是为此类任务设置地图的错误方法,因为读取文件将是缓慢的部分。 如果你有一个更复杂的任务——例如,阅读一行,思考很多,设置一些东西——你会希望它作为你的伪代码结构:

type workType struct {
index int
line  string
}
var wg sync.WaitGroup
wg.Add(nWorkers)
// I made this buffered originally but there's no real point, so
// fixing that in an edit
work := make(chan workType)
for i := 0; i < nWorkers; i++ {
go readAndDoWork(work, &wg)
}
for i := 1; fscanner.Scan(); i++ {
work <- workType{index: i, line: fscanner.Text()}
}
close(work)
wg.Wait()
... now your dictionary is ready ...

随着工人这样做:

func readAndDoWork(ch chan workType, wg *sync.WorkGroup) {
for item := range ch {
... do computation ...
insertIntoDict(item.index, result)
}
wg.Done()
}

insertIntoDict获取互斥锁(以保护映射从索引到结果(并写入字典。 (如果您愿意,可以将其内联。

这里的想法是设置一定数量的工作器(可能基于可用 CPU 的数量(,每个工作线程获取下一个工作项并处理它。 主 goroutine 只是分配工作,然后关闭工作通道——这将导致所有工作线程看到输入结束——然后等待他们发出他们完成计算的信号。

(如果您愿意,可以再创建一个 goroutine,用于读取工作线程计算的结果并将其放入映射中。 这样,映射本身就不需要互斥锁。

正如我在评论中提到的,您无法控制 goroutines 的执行顺序,因此不应从它们内部更改索引。

下面是一个示例,其中与映射的交互在单个 goroutine 中,而您的处理在其他例程中:

package main
import (
"bufio"
"fmt"
"log"
"os"
"sync"
)
var (
fileName = "some.dat"
MAX      = 9000
)
func checkerr(err error) {
if err != nil {
fmt.Println(err)
log.Fatal(err)
}
}
type result struct {
index int
data string
}
func main() {
ourDict := make(map[int]string)
f, err := os.Open(fileName)
checkerr(err)
defer f.Close()
fscanner := bufio.NewScanner(f)
var wg sync.WaitGroup
sem := make(chan struct{}, MAX) // Use empty structs for semaphores as they have no allocation
defer close(sem)
out := make(chan result)
defer close(out)
indexPos := 1
for fscanner.Scan() {
text := fscanner.Text()
wg.Add(1)
sem <- struct{}{}
go func(index int, data string) {
// Defer the release of your resources, otherwise if any error occur in your goroutine
// you'll have a deadlock
defer func() {
wg.Done()
<-sem
}()
// Process your data
out <- result{index, data}
}(indexPos, text) // Pass in the data that will change on the iteration, go optimizer will move it around better
indexPos++
}
// The goroutine is the only one to write to the dict, so no race condition
go func() {
for {
if entry, ok := <-out; ok {
ourDict[entry.index] = entry.data
} else {
return // Exit goroutine when channel closes
}
}
}()
wg.Wait()
for i, v := range ourDict {
fmt.Printf("%d: %sn", i, v)
}
}

好的,我已经想通了。通过复制给goroutine一个值来挂在上面,似乎有效。

改变:

for fscanner.Scan() {
text := fscanner.Text()
wg.Add(1)
sem <- 1
go func() {
mu.Lock()
defer mu.Unlock()
ourDict[indexPos] = text
indexPos++
<- sem
wg.Done()
}()
}

for fscanner.Scan() {
text := fscanner.Text()
wg.Add(1)
sem <- 1
go func(mypos int) {
mu.Lock()
defer mu.Unlock()
ourDict[mypos] = text
<-sem
wg.Done()
}(indexPos)
indexPos++
}

完整代码:https://play.golang.org/p/dkHaisPHyHz

使用工作线程池,

package main
import (
"bufio"
"fmt"
"log"
"os"
"sync"
)
const (
MAX      = 10
fileName = "some.dat"
)
type gunk struct {
line string
id   int
}
func main() {
ourDict := make(map[int]string)
wg := sync.WaitGroup{}
mu := sync.RWMutex{}
cha := make(chan gunk)
for i := 0; i < MAX; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
textin, ok := <-cha
if !ok {
return
}
mu.Lock()
ourDict[textin.id] = textin.line
mu.Unlock()
}
}(i)
}
f, err := os.Open(fileName)
checkerr(err)
defer f.Close()
fscanner := bufio.NewScanner(f)
indexPos := 1
for fscanner.Scan() {
text := fscanner.Text()
thisgunk := gunk{line: text, id: indexPos}
cha <- thisgunk
indexPos++
}
close(cha)
wg.Wait()
for i, v := range ourDict {
fmt.Printf("%d: %sn", i, v)
}
}
func checkerr(err error) {
if err != nil {
fmt.Println(err)
log.Fatal(err)
}
}

最新更新