linux之使用 Go TCP 客户端-服务器实现高吞吐量

熊孩纸 阅读:765 2023-08-07 13:44:43 评论:0

我要开发一个简单的 TCP 客户端和服务器,我想实现高吞吐量(每秒 300000 个请求),这很容易通过服务器硬件上的 Cpp 或 C TCP 客户端和服务器实现。我的意思是一台 48 核和 64G 内存的服务器。

在我的测试台上,客户端和服务器都有 10G 网络接口(interface)卡,我在服务器端启用了接收端缩放,在客户端启用了传输数据包控制。

我将客户端配置为每秒发送 10,000 个请求。我只是从 bash 脚本运行多个 Go go run client.go 实例以增加吞吐量。然而,这样一来,Go 就会在操作系统上创建大量的线程,大量的线程导致上下文切换成本很高,我无法达到这样的吞吐量。我怀疑我从命令行运行的 Go 实例的数量。下面的代码是方法中客户端的代码片段:

func Main(cmd_rate_int int, cmd_port string) { 
 
   //runtime.GOMAXPROCS(2) // set maximum number of processes to be used by this applications 
 
   //var rate float64 = float64(rate_int) 
 
   rate := float64(cmd_rate_int) 
 
   port = cmd_port 
 
   conn, err := net.Dial("tcp", port) 
   if err != nil { 
       fmt.Println("ERROR", err) 
       os.Exit(1) 
   } 
 
   var my_random_number float64 = nextTime(rate) * 1000000 
   var my_random_int int = int(my_random_number) 
   var int_message int64 = time.Now().UnixNano() 
   byte_message := make([]byte, 8) 
 
   go func(conn net.Conn) { 
       buf := make([]byte, 8) 
 
       for true { 
           _, err = io.ReadFull(conn, buf) 
           now := time.Now().UnixNano() 
 
           if err != nil { 
               return 
           } 
 
           last := int64(binary.LittleEndian.Uint64(buf)) 
           fmt.Println((now - last) / 1000) 
       } 
       return 
 
   }(conn) 
 
   for true { 
       my_random_number = nextTime(rate) * 1000000 
       my_random_int = int(my_random_number) 
       time.Sleep(time.Microsecond * time.Duration(my_random_int)) 
       int_message = time.Now().UnixNano() 
       binary.LittleEndian.PutUint64(byte_message, uint64(int_message)) 
       conn.Write(byte_message) 
   } 
} 

所以我尝试通过在 main 中调用 go client() 来运行我所有的 Go 线程,这样我就不会在 Linux 命令行中运行多个实例。我认为这可能是个更好的主意。这基本上是一个更好的主意,操作系统中的线程数不会增加到 700 左右。但是吞吐量仍然很低,而且似乎没有使用底层硬件的所有功能。实际上,您可能想看看我在第二种方法中运行的代码:

func main() { 
 
   //runtime.GOMAXPROCS(2) // set maximum number of processes to be used by this applications 
   args := os.Args[1:] 
   rate_int, _ := strconv.Atoi(args[0]) 
   client_size, _ := strconv.Atoi(args[1]) 
   port := args[2] 
 
   i := 0 
   for i <= client_size { 
       go client.Main(rate_int, port) 
       i = i + 1 
   } 
 
   for true { 
 
   } 
} 

我想知道达到高吞吐量的最佳实践是什么?我一直听说 Go 是轻量级和高性能的,可以与 C/Cpp pthread 相媲美。但是,我认为就性能而言,C/Cpp 仍然远远优于 Go。我可能会在这个问题上做错一些事情,所以如果有人可以帮助使用 Go 实现高吞吐量,我会很高兴。

请您参考如下方法:

这是操作代码的快速返工。 由于原始源代码是有效的,它没有提供解决方案,但它说明了存储桶 token 的用法和其他一些小的 go 技巧。

它确实重新使用了与 op 源代码类似的默认值。

它表明您不需要两个文件/程序来同时提供客户端和服务器。

它演示了标志包的用法。

它展示了如何使用 time.Unix(x,y) 正确解析 unix nano 时间戳

它展示了如何利用 io.Copy 在同一个 net.Conn 上写入所读内容。而不是手动编写。

不过,这对于生产交付来说是不合适的。

package main 
 
import ( 
    "encoding/binary" 
    "flag" 
    "fmt" 
    "io" 
    "log" 
    "math" 
    "math/rand" 
    "net" 
    "os" 
    "sync/atomic" 
    "time" 
 
    "github.com/juju/ratelimit" 
) 
 
var total_rcv int64 
 
func main() { 
 
    var cmd_rate_int float64 
    var cmd_port string 
    var client_size int 
 
    flag.Float64Var(&cmd_rate_int, "rate", 400000, "change rate of message reading") 
    flag.StringVar(&cmd_port, "port", ":9090", "port to listen") 
    flag.IntVar(&client_size, "size", 20, "number of clients") 
 
    flag.Parse() 
 
    t := flag.Arg(0) 
 
    if t == "server" { 
        server(cmd_port) 
 
    } else if t == "client" { 
        for i := 0; i < client_size; i++ { 
            go client(cmd_rate_int, cmd_port) 
        } 
        // <-make(chan bool) // infinite wait. 
        <-time.After(time.Second * 2) 
        fmt.Println("total exchanged", total_rcv) 
 
    } else if t == "client_ratelimit" { 
        bucket := ratelimit.NewBucketWithQuantum(time.Second, int64(cmd_rate_int), int64(cmd_rate_int)) 
        for i := 0; i < client_size; i++ { 
            go clientRateLimite(bucket, cmd_port) 
        } 
        // <-make(chan bool) // infinite wait. 
        <-time.After(time.Second * 3) 
        fmt.Println("total exchanged", total_rcv) 
    } 
} 
 
func server(cmd_port string) { 
    ln, err := net.Listen("tcp", cmd_port) 
    if err != nil { 
        panic(err) 
    } 
 
    for { 
        conn, err := ln.Accept() 
        if err != nil { 
            panic(err) 
        } 
        go io.Copy(conn, conn) 
    } 
} 
 
func client(cmd_rate_int float64, cmd_port string) { 
 
    conn, err := net.Dial("tcp", cmd_port) 
    if err != nil { 
        log.Println("ERROR", err) 
        os.Exit(1) 
    } 
    defer conn.Close() 
 
    go func(conn net.Conn) { 
        buf := make([]byte, 8) 
        for { 
            _, err := io.ReadFull(conn, buf) 
            if err != nil { 
                break 
            } 
            // int_message := int64(binary.LittleEndian.Uint64(buf)) 
            // t2 := time.Unix(0, int_message) 
            // fmt.Println("ROUDNTRIP", time.Now().Sub(t2)) 
            atomic.AddInt64(&total_rcv, 1) 
        } 
        return 
    }(conn) 
 
    byte_message := make([]byte, 8) 
    for { 
        wait := time.Microsecond * time.Duration(nextTime(cmd_rate_int)) 
        if wait > 0 { 
            time.Sleep(wait) 
            fmt.Println("WAIT", wait) 
        } 
        int_message := time.Now().UnixNano() 
        binary.LittleEndian.PutUint64(byte_message, uint64(int_message)) 
        _, err := conn.Write(byte_message) 
        if err != nil { 
            log.Println("ERROR", err) 
            return 
        } 
    } 
} 
 
func clientRateLimite(bucket *ratelimit.Bucket, cmd_port string) { 
 
    conn, err := net.Dial("tcp", cmd_port) 
    if err != nil { 
        log.Println("ERROR", err) 
        os.Exit(1) 
    } 
    defer conn.Close() 
 
    go func(conn net.Conn) { 
        buf := make([]byte, 8) 
        for { 
            _, err := io.ReadFull(conn, buf) 
            if err != nil { 
                break 
            } 
            // int_message := int64(binary.LittleEndian.Uint64(buf)) 
            // t2 := time.Unix(0, int_message) 
            // fmt.Println("ROUDNTRIP", time.Now().Sub(t2)) 
            atomic.AddInt64(&total_rcv, 1) 
        } 
        return 
    }(conn) 
 
    byte_message := make([]byte, 8) 
    for { 
        bucket.Wait(1) 
        int_message := time.Now().UnixNano() 
        binary.LittleEndian.PutUint64(byte_message, uint64(int_message)) 
        _, err := conn.Write(byte_message) 
        if err != nil { 
            log.Println("ERROR", err) 
            return 
        } 
    } 
} 
 
func nextTime(rate float64) float64 { 
    return -1 * math.Log(1.0-rand.Float64()) / rate 
} 


标签:linux
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

全民解析

全民解析

关注我们