增加程序并发性以提高代码效率


func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
// data massaging
response, err := http.Post(postURL, "application/json", responseBody)
if err != nil{
panic(err)
}
fmt.Printf("POST : %sn", status)
return response
}
func callback(rdb *redis.Client) {
for {
data, err := rdb.RPop(ctx, "callback").Result()
if err == redis.Nil {
fmt.Println("Sleeping")
time.Sleep(2 * time.Second) // sleep for 2s
continue
}
// more work
sendCallback(status, sender, recipient, gatewayMessageId)
}
}

func main() {
rdb := redis.NewClient(&redis.Options{
Addr:     "127.0.0.1:6379",
Password: "",
DB:       0,
})
callback(rdb)
}

我明白上面的代码是有缺陷的,因为我正在等待"data"待处理。然而,我希望它是非阻塞的,并且一旦数据出现,我希望它被处理。我一直在阅读关于程序的教程,但我无法理解它。

编辑

根据@torek的解释,如果我从回调函数中取出无限循环,在主函数中进行,并让回调专注于它的部分,这是goroutine应该如何工作吗?

func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
// data massaging
response, err := http.Post(postURL, "application/json", responseBody)
if err != nil{
panic(err)
}
fmt.Printf("POST : %sn", status)
return response
}
func callback(data string) {
parsedData := make(map[string]interface{})
err := json.Unmarshal([]byte(data), &parsedData)
if err != nil {
panic(err)
}
sender := parsedData["sender"].(string)
recipient := parsedData["recipient"].(string)
gatewayMessageId := parsedData["gateway_message_id"].(string)

sendCallback("sent", sender, recipient, gatewayMessageId)
sendCallback("delivered", sender, recipient, gatewayMessageId)
}
func main() {
rdb := redis.NewClient(&redis.Options{
Addr:     "127.0.0.1:6379",
Password: "",
DB:       0,
})
for {
data, err := rdb.RPop(ctx, "callback").Result()
if err == redis.Nil {
fmt.Println("Sleeping")
time.Sleep(2 * time.Second) // sleep for 2s
continue
}
go callback(data)
}
}

我建议一个工作池解决方案。工作池应该有助于控制CPU和内存的使用。由于上下文切换开销,大量执行CPU密集型操作的例程并不是最优的。但是最重要的好处是可以控制http客户端。您的代码为从redis接收的每条消息创建一个程序。这些例程然后发出HTTP请求。如果在一段时间内,目标服务接收到的redis消息数超过了可处理的HTTP请求数,则会导致目标服务崩溃。或者,如果应用程序由于大量的程序例程而达到内存限制,则可能崩溃。您可以设置MaxConnsPerHost,这将防止不受控制的连接创建,但它不会阻止创建新的运行例程。在我的建议中,应用程序通过利用通道来调整处理速度以适应目标服务的功能。如果您发现目标服务可以处理更多请求,并且您有空闲的CPU功率,则可以增加worker的数量。

type message struct {
Sender           string `json:"sender"`
Recipient        string `json:"recipient"`
GatewayMessageID string `json:"gateway_message_id"`
}
func sendCallback(status string, m message) *http.Response {
// data massaging
...
response, err := http.Post(postURL, "application/json", responseBody)
if err != nil {
log.Fatal(err)
}
fmt.Printf("POST : %sn", status)
return response
}
func worker(messages chan []byte) {
for rawMessage := range  messages {
m := message{}
err := json.Unmarshal(rawMessage, &m)
if err != nil {
log.Fatal(err)
}
sendCallback("sent", m)
sendCallback("delivered", m)
}
}
const numberOfWorkers = 10
func main() {
rdb := redis.NewClient(&redis.Options{
Addr:     "127.0.0.1:6379",
Password: "",
DB:       0,
})
messages := make(chan []byte, numberOfWorkers)
for i := 0; i < numberOfWorkers; i++ {
go worker(messages)
}
for {
data, err := rdb.RPop(ctx, "callback").Result()
if err == redis.Nil {
fmt.Println("Sleeping")
time.Sleep(2 * time.Second) // sleep for 2s
continue
} else if err != nil {
log.Fatal(err)
}
messages <- []byte(data)
}
}

最新更新