在Go中使用通道并发地接收响应和写SQL



我正在使用Go从外部API实现JSON数据的管道,处理消息,然后发送到SQL数据库。

我正在尝试并发运行API请求,然后在我返回响应后,我想通过load()通过另一个例程将其插入DB。

在我下面的代码中,有时我会在load()函数中收到我的log.Printf(),有时我不会。这表明我可能关闭了一个频道或没有正确设置通信。

我尝试的模式是这样的:

package main
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"time"
)
type Request struct {
url string
}
type Response struct {
status  int
args    Args    `json:"args"`
headers Headers `json:"headers"`
origin  string  `json:"origin"`
url     string  `json:"url"`
}
type Args struct {
}
type Headers struct {
accept string `json:"Accept"`
}
func main() {
start := time.Now()
numRequests := 5
responses := make(chan Response, 5)
defer close(responses)
for i := 0; i < numRequests; i++ {
req := Request{url: "https://httpbin.org/get"}
go func(req *Request) {
resp, err := extract(req)
if err != nil {
log.Fatal("Error extracting data from API")
return
}
// Send response to channel
responses <- resp
}(&req)
// Perform go routine to load data
go load(responses)
}
log.Println("Execution time: ", time.Since(start))
}
func extract(req *Request) (r Response, err error) {
var resp Response
request, err := http.NewRequest("GET", req.url, nil)
if err != nil {
return resp, err
}
request.Header = http.Header{
"accept": {"application/json"},
}
response, err := http.DefaultClient.Do(request)
defer response.Body.Close()
if err != nil {
log.Fatal("Error")
return resp, err
}
// Read response data
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Fatal("Error")
return resp, err
}
json.Unmarshal(body, &resp)
resp.status = response.StatusCode
return resp, nil
}
type Record struct {
origin string
url    string
}
func load(ch chan Response) {
// Read response from channel
resp := <-ch
// Process the response data
records := process(resp)
log.Printf("%+vn", records)
// Load data to db stuff here
}
func process(resp Response) (record Record) {
// Process the response struct as needed to get a record of data to insert to DB
return record
}

程序在工作完成之前没有防止完成的保护。因此,有时程序在程序完成之前就终止了。

要防止这种情况,请使用WaitGroup:

wg:=sync.WaitGroup{}
for i := 0; i < numRequests; i++ {
...
wg.Add(1)
go func() {
defer wg.Done()
load(responses)
}()
}
wg.Wait()

最新更新