使用 Go TCP 客户端-服务器实现高吞吐量



我将开发一个简单的TCP客户端和服务器,我希望实现高吞吐量(300000个请求/秒(,这很容易通过服务器硬件上的Cpp或C TCP客户端和服务器到达。我的意思是具有48个内核和64G内存的服务器。

在我的测试平台上,客户端和服务器都有 10G 网络接口卡,我在服务器端启用了接收端扩展,在客户端启用了传输数据包转向。

我将客户端配置为每秒发送 10,000 个请求。我只是从 bash 脚本运行多个 Gogo run client.go实例来增加吞吐量。但是,通过这种方式,Go 将在操作系统上创建大量线程,并且大量线程会导致高上下文切换成本,我无法接近这样的吞吐量。我怀疑我从命令行运行的 Go 实例的数量。下面的代码是该方法中客户端的代码片段:

func Main(cmd_rate_int int, cmd_port string) {
//runtime.GOMAXPROCS(2) // set maximum number of processes to be used by this applications
//var rate float64 = float64(rate_int)
rate := float64(cmd_rate_int)
port = cmd_port
conn, err := net.Dial("tcp", port)
if err != nil {
fmt.Println("ERROR", err)
os.Exit(1)
}
var my_random_number float64 = nextTime(rate) * 1000000
var my_random_int int = int(my_random_number)
var int_message int64 = time.Now().UnixNano()
byte_message := make([]byte, 8)
go func(conn net.Conn) {
buf := make([]byte, 8)
for true {
_, err = io.ReadFull(conn, buf)
now := time.Now().UnixNano()
if err != nil {
return
}
last := int64(binary.LittleEndian.Uint64(buf))
fmt.Println((now - last) / 1000)
}
return
}(conn)
for true {
my_random_number = nextTime(rate) * 1000000
my_random_int = int(my_random_number)
time.Sleep(time.Microsecond * time.Duration(my_random_int))
int_message = time.Now().UnixNano()
binary.LittleEndian.PutUint64(byte_message, uint64(int_message))
conn.Write(byte_message)
}
}

所以我尝试通过在main中调用go client()来运行我所有的 Go 线程,这样我就不会在 Linux 命令行中运行多个实例。我认为这可能是一个更好的主意。基本上,这确实是一个更好的主意,并且在操作系统中线程数不会增加到 700 左右。但是吞吐量仍然很低,似乎没有使用底层硬件的所有功能。实际上,您可能希望查看我在第二种方法中运行的代码:

func main() {
//runtime.GOMAXPROCS(2) // set maximum number of processes to be used by this applications
args := os.Args[1:]
rate_int, _ := strconv.Atoi(args[0])
client_size, _ := strconv.Atoi(args[1])
port := args[2]
i := 0
for i <= client_size {
go client.Main(rate_int, port)
i = i + 1
}
for true {
}
}

我想知道达到高吞吐量的最佳实践是什么?我一直听说 Go 是轻量级的,性能很高,可以与 C/Cpp pthread 相媲美。但是,我认为就性能而言,C/CPP 仍然远远优于 Go。我可能会在这个问题上做一些非常错误的事情,所以如果有人可以帮助使用 Go 实现高吞吐量,我会很高兴。

这是操作代码的快速返工。 由于原始源代码正在工作,它不提供解决方案,但它说明了存储桶令牌的用法,以及其他一些小技巧。

它确实重新使用与操作源代码类似的默认值。

它演示了您不需要两个文件/程序来提供客户端和服务器。

它演示了标志包的用法。

它展示了如何使用时间适当地解析 unix nano 时间戳。Unix(x,y(

它展示了如何利用io。复制以在同一网络上写入您阅读的内容。Conn. 而不是手动编写。

尽管如此,这对于生产交付来说是不合适的。

package main
import (
"encoding/binary"
"flag"
"fmt"
"io"
"log"
"math"
"math/rand"
"net"
"os"
"sync/atomic"
"time"
"github.com/juju/ratelimit"
)
var total_rcv int64
func main() {
var cmd_rate_int float64
var cmd_port string
var client_size int
flag.Float64Var(&cmd_rate_int, "rate", 400000, "change rate of message reading")
flag.StringVar(&cmd_port, "port", ":9090", "port to listen")
flag.IntVar(&client_size, "size", 20, "number of clients")
flag.Parse()
t := flag.Arg(0)
if t == "server" {
server(cmd_port)
} else if t == "client" {
for i := 0; i < client_size; i++ {
go client(cmd_rate_int, cmd_port)
}
// <-make(chan bool) // infinite wait.
<-time.After(time.Second * 2)
fmt.Println("total exchanged", total_rcv)
} else if t == "client_ratelimit" {
bucket := ratelimit.NewBucketWithQuantum(time.Second, int64(cmd_rate_int), int64(cmd_rate_int))
for i := 0; i < client_size; i++ {
go clientRateLimite(bucket, cmd_port)
}
// <-make(chan bool) // infinite wait.
<-time.After(time.Second * 3)
fmt.Println("total exchanged", total_rcv)
}
}
func server(cmd_port string) {
ln, err := net.Listen("tcp", cmd_port)
if err != nil {
panic(err)
}
for {
conn, err := ln.Accept()
if err != nil {
panic(err)
}
go io.Copy(conn, conn)
}
}
func client(cmd_rate_int float64, cmd_port string) {
conn, err := net.Dial("tcp", cmd_port)
if err != nil {
log.Println("ERROR", err)
os.Exit(1)
}
defer conn.Close()
go func(conn net.Conn) {
buf := make([]byte, 8)
for {
_, err := io.ReadFull(conn, buf)
if err != nil {
break
}
// int_message := int64(binary.LittleEndian.Uint64(buf))
// t2 := time.Unix(0, int_message)
// fmt.Println("ROUDNTRIP", time.Now().Sub(t2))
atomic.AddInt64(&total_rcv, 1)
}
return
}(conn)
byte_message := make([]byte, 8)
for {
wait := time.Microsecond * time.Duration(nextTime(cmd_rate_int))
if wait > 0 {
time.Sleep(wait)
fmt.Println("WAIT", wait)
}
int_message := time.Now().UnixNano()
binary.LittleEndian.PutUint64(byte_message, uint64(int_message))
_, err := conn.Write(byte_message)
if err != nil {
log.Println("ERROR", err)
return
}
}
}
func clientRateLimite(bucket *ratelimit.Bucket, cmd_port string) {
conn, err := net.Dial("tcp", cmd_port)
if err != nil {
log.Println("ERROR", err)
os.Exit(1)
}
defer conn.Close()
go func(conn net.Conn) {
buf := make([]byte, 8)
for {
_, err := io.ReadFull(conn, buf)
if err != nil {
break
}
// int_message := int64(binary.LittleEndian.Uint64(buf))
// t2 := time.Unix(0, int_message)
// fmt.Println("ROUDNTRIP", time.Now().Sub(t2))
atomic.AddInt64(&total_rcv, 1)
}
return
}(conn)
byte_message := make([]byte, 8)
for {
bucket.Wait(1)
int_message := time.Now().UnixNano()
binary.LittleEndian.PutUint64(byte_message, uint64(int_message))
_, err := conn.Write(byte_message)
if err != nil {
log.Println("ERROR", err)
return
}
}
}
func nextTime(rate float64) float64 {
return -1 * math.Log(1.0-rand.Float64()) / rate
}

编辑这是一个非常糟糕的答案。查看mh-cbon评论以了解原因。


我不完全明白你是如何做到这一点的,但是如果我想控制 Go 上的速率,我通常会做 2 个嵌套的循环:

for ;; time.Sleep(time.Second) {
go func (){
for i:=0; i<rate; i++ {
go func (){
// Do whatever
}()
}
}()
}

我在每个循环中启动一个goroutine,以便:

  • 在外部循环上,以确保迭代之间只有 1 秒
  • 在内部循环中,以确保我可以启动我想要的所有请求

把它放在像你这样的问题上,它看起来像这样:

package main
import (
"net"
"os"
"time"
)
const (
rate    = 100000
address = "localhost:8090"
)
func main() {
conn, err := net.Dial("tcp", address)
if err != nil {
os.Stderr.Write([]byte(err.Error() + "n"))
os.Exit(1)
}
for ; err == nil; time.Sleep(time.Second) {
go func() {
for i := 0; i < rate; i++ {
go func(conn net.Conn) {
if _, err := conn.Write([]byte("01234567")); err != nil {
os.Stderr.Write([]byte("nConnection closed: " + err.Error() + "n"))
}
}(conn)
}
}()
}
}

要验证这是否确实在发送目标请求速率,您可以有一个如下所示的测试 TCP 侦听器:

package main
import (
"fmt"
"net"
"os"
"time"
)
const (
address = ":8090"
payloadSize = 8
)
func main() {
count := 0
b := make([]byte, payloadSize)
l, err := net.Listen("tcp", address)
if err != nil {
fmt.Fprintf(os.Stdout, "nCan't listen to address %v: %vn", address, err)
return
}

defer l.Close()
go func() {
for ; ; time.Sleep(time.Second) {
fmt.Fprintf(os.Stdout, "rRate: %v/s       ", count)
count = 0
}
}()
for {
conn, err := l.Accept()
if err != nil {
fmt.Fprintf(os.Stderr, "nFailed to accept connection: %vn", err)
}
for {
_, err := conn.Read(b)
if err != nil {
fmt.Fprintf(os.Stderr, "nConnection closed: %vn", err)
break
}
count = count + 1
}
}

}

由于无法同时写入带有错误inconsistent fdMutex的连接,我发现了一些问题。这是由于达到0xfffff多个并发写入,而 fdMutex 不支持这些写入。若要缓解此问题,请确保不要超过该数量的并发写入。在我的系统中,它是>100k/s。这不是您期望的 300k/s,但我的系统没有为此做好准备。

最新更新