并发解析 JSON - 运行时错误崩溃(解码相关)



我最近在玩围棋,但遇到了运行时错误,我无法解释。这些是我的工作职能。

type User struct {
Browsers []string `json:"browsers"`
Name     string   `json:"name"`
Email    string   `json:"email"`
}
func asyncUserProcJson(wg *sync.WaitGroup, users *[]User, ch chan []byte) {
for buf := range ch {
var mu sync.Mutex
var user User
mu.Lock()
err := json.Unmarshal(buf, &user)
mu.Unlock()
if err != nil {
fmt.Println("json:", err)
wg.Done()
continue
}
*users = append(*users, user)
wg.Done()
}
}
func userProcJson(buf []byte) (User, error) {
var user User
err := json.Unmarshal(buf, &user)
if err != nil {
return User{}, err
}
return user, nil
}

如果我做一个常见的 - 非并发 aproach,它按预期工作。但是,如果尝试使用通道将字节传递到goroutine...它失败了。

type AsyncUserProc func(*sync.WaitGroup, *[]User, chan []byte)
type UserProc func(buf []byte) (User, error)
type SearchParams struct {
out              io.Writer
asyncUserProc    AsyncUserProc 
userProc         UserProc 
}
func (sp SearchParams) AsyncSearch() []User {
file, err := os.Open(filePath)
if err != nil {
log.Fatalln(err)
}
var Users = make([]User, 0, 1024)
var ch = make(chan []byte)
var wg sync.WaitGroup
go sp.asyncUserProcess(&wg, &Users, ch)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
wg.Add(1)
ch <- scanner.Bytes()
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading standard input:", err)
}
close(ch)
wg.Wait()
return Users
}
func (sp SearchParams) Search() []User {
file, err := os.Open(filePath)
if err != nil {
log.Fatalln(err)
}
// json processor
var Users = make([]User, 0, 1024)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
u, err := sp.userProcess(scanner.Bytes())
if err != nil {
log.Panicln(err)
continue
}
Users = append(Users, u)
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading standard input:", err)
}
return Users
}

工作流是下一个:

  1. filePath 包含一个 JSON 块(每个块都在新行上(
  2. 打开阅读。
  3. 创建线扫描仪

    1. (异步搜索(

      • 通过线路到通道。
      • 从范围返回线的值(阻塞操作(
      • 传递给 JSON。解组
      • 麻烦
    2. (搜索(

      • 将行直接传递给用户Proc 功能
      • 享受结果

我收到很多(不同的(错误。

  • 很多 JSON 解组错误。
  • 索引超出范围
  • JSON 解码器不同步 - 数据在脚下更改?

作为最后一个错误的描述:

// phasePanicMsg is used as a panic message when we end up with something that
// shouldn't happen. It can indicate a bug in the JSON decoder, or that
// something is editing the data slice while the decoder executes.

所以这里有一个问题:字节切片是如何修改的?我以为这是阻塞操作。我在语言力学中缺少什么?

错误示例(每次运行不同(

json: invalid character 'i' looking for beginning of value
json: invalid character ':' after top-level value
json: invalid character 'r' looking for beginning of value
panic: runtime error: index out of range
----
json: invalid character '.' after top-level value
json: invalid character 'K' looking for beginning of value
panic: JSON decoder out of sync - data changing underfoot?

Package bufio

import "bufio"

函数 (*扫描仪( 字节

func (s *Scanner) Bytes() []byte

字节返回调用扫描生成的最新令牌。这 基础数组可能指向将被 后续调用扫描。它不进行分配。


基础数组可能指向将被后续 Scan 调用覆盖的数据。

最新更新