为了账号安全,请及时绑定邮箱和手机立即绑定

Go net/http 在高负载下泄漏内存

Go net/http 在高负载下泄漏内存

Go
森栏 2023-08-14 16:32:32
我正在开发一个使用该net/http包调用客户端 URL 的 API。根据用户国家/操作系统,goroutines 中的每个请求(POST 调用)会同时调用 1 到 8 个 URL。该应用程序以大约 1000-1500 个请求的低 qps 运行,但将应用程序扩展到 3k 请求时,内存会突然增加,即使仅调用 1 个客户端 URL,应用程序也会在几分钟后停止响应(响应时间远高于 50 秒) )。我正在使用 Go 本机net/http包和gorilla/mux路由器。关于这个问题的其他问题说要关闭响应正文,但我已经使用了        req, err := http.NewRequest("POST", "client_post_url", bytes.NewBuffer(requestBody))        req.Header.Set("Content-Type", "application/json")        req.Header.Set("Connection", "Keep-Alive")        response, err := common.Client.Do(req)        status := 0        if err != nil {//handle and return}        defer response.Body.Close() //used with/without io.Copy        status = response.StatusCode        body, _ := ioutil.ReadAll(response.Body)        _, err = io.Copy(ioutil.Discard, response.Body)我需要重用连接,因此我已经在 init 方法中初始化了 http 客户端和传输全局变量,如下所示。    common.Transport = &http.Transport{        TLSClientConfig: &tls.Config{            InsecureSkipVerify: true,        },        DialContext: (&net.Dialer{            //Timeout: time.Duration(300) * time.Millisecond,            KeepAlive: 30 * time.Second,        }).DialContext,        //ForceAttemptHTTP2:     true,        DisableKeepAlives: false,        //MaxIdleConns:      0,        //IdleConnTimeout:   0,        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,        //ExpectContinueTimeout: 1 * time.Second,    }    common.Client = &http.Client{        Timeout:   time.Duration(300) * time.Millisecond,        Transport: common.Transport,    }我读过使用 keep-alive 会导致内存泄漏,我尝试了一些组合来根据请求禁用 keep-alive/close 请求标志。但似乎没有任何作用。另外,如果我不进行任何 http 调用并time.Sleep(300 * time.Millisecond)在同时调用每个 url 的 goroutine 中使用,则应用程序可以正常工作而不会出现任何泄漏。所以我确信这与 client/http 包在高负载下连接未释放或未正确使用有关。我应该采取什么方法来实现这一目标?创建自定义服务器和自定义处理程序类型来接受请求和路由请求是否会像几篇文章中的 C10K 方法中提到的那样工作?如果需要,我可以分享包含所有详细信息的示例代码。上面只是补充了我觉得问题所在的部分。
查看完整描述

2 回答

?
慕无忌1623718

TA贡献1744条经验 获得超4个赞

这段代码没有泄露。


为了演示,让我们稍微更新一下它,以便该帖子可以重现。


主程序


package main


import (

    "bytes"

    "crypto/tls"

    _ "expvar"

    "fmt"

    "io"

    "io/ioutil"

    "log"

    "math/rand"

    "net"

    "net/http"

    _ "net/http/pprof"

    "os"

    "runtime"

    "strconv"

    "sync"

    "time"


    "github.com/gorilla/mux"

)


var (

    //http client

    Client *http.Client


    //http Transport

    Transport *http.Transport

)



func init() {


    go http.ListenAndServe("localhost:6060", nil)


    //Get Any command line argument passed

    args := os.Args[1:]

    numCPU := runtime.NumCPU()

    if len(args) > 1 {

        numCPU, _ = strconv.Atoi(args[0])

    }


    Transport = &http.Transport{

        TLSClientConfig: &tls.Config{

            InsecureSkipVerify: true,

        },

        DialContext: (&net.Dialer{

            //Timeout: time.Duration() * time.Millisecond,

            KeepAlive: 30 * time.Second,

        }).DialContext,

        //ForceAttemptHTTP2:     true,

        DisableKeepAlives: false,

        //MaxIdleConns:      0,

        //IdleConnTimeout:   0,

        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,

        //ExpectContinueTimeout: 1 * time.Second,

    }


    Client = &http.Client{

        // Timeout:   time.Duration(300) * time.Millisecond,

        Transport: Transport,

    }


    runtime.GOMAXPROCS(numCPU)


    rand.Seed(time.Now().UTC().UnixNano())

}


func main() {


    router := mux.NewRouter().StrictSlash(true)

    router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        _, _ = fmt.Fprintf(w, "Hello!!!")

    })


    router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {

        vars := mux.Vars(r)


        prepareRequest(w, r, vars["name"])


    }).Methods("POST", "GET")


    // Register pprof handlers

    // router.HandleFunc("/debug/pprof/", pprof.Index)

    // router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

    // router.HandleFunc("/debug/pprof/profile", pprof.Profile)

    // router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)

    // router.HandleFunc("/debug/pprof/trace", pprof.Trace)


    routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")


    srv := &http.Server{

        Addr: "localhost:8080",

        /*ReadTimeout:  500 * time.Millisecond,

          WriteTimeout: 500 * time.Millisecond,

          IdleTimeout:  10 * time.Second,*/

        Handler: routerMiddleWare,

    }


    log.Fatal(srv.ListenAndServe())

}


func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {


    // go func() {

    //  make(chan []byte) <- make([]byte, 10024)

    // }()


    //other part of the code and call to goroutine

    var urls []string

    urls = append(urls,

        "http://localhost:7000/",

        "http://localhost:7000/",

    )

    results, s, c := callUrls(urls)

    finalCall(w, results, s, c)


}


type Response struct {

    Status int

    Url    string

    Body   string

}


func callUrls(urls []string) ([]*Response, []string, []string) {

    var wg sync.WaitGroup

    wg.Add(len(urls))

    ch := make(chan func() (*Response, string, string), len(urls))

    for _, url := range urls {

        go func(url string) {

            //decide if request is valid for client to make http call using country/os

            isValid := true //assuming url to be called

            if isValid {

                //make post call

                //request body have many more paramter, just sample included.

                //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.

                req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`)))

                if err != nil {

                    wg.Done()

                    ch <- func() (*Response, string, string) {

                        return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"

                    }

                    return

                }

                req.Header.Set("Content-Type", "application/json")

                req.Header.Set("Connection", "Keep-Alive")

                //req.Close = true


                response, err := Client.Do(req)


                if err != nil {

                    wg.Done()

                    ch <- func() (*Response, string, string) {

                        return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"

                    }

                    return

                }


                defer response.Body.Close()

                body, _ := ioutil.ReadAll(response.Body)

                io.Copy(ioutil.Discard, response.Body)


                //Close the body, forced this

                //Also tried without defer, and only wothout following line

                response.Body.Close()


                //do something with response body replace a few string etc.

                //and return

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"

                }


            } else {

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"

                }

            }


        }(url)

    }

    wg.Wait()

    var (

        results []*Response

        msg     []string

        status  []string

    )

    for {

        r, x, y := (<-ch)()

        if r != nil {


            results = append(results, r)

            msg = append(msg, x)

            status = append(status, y)

        }

        if len(results) == len(urls) {

            return results, msg, status

        }


    }

}


func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {

    fmt.Println("response", "response body", results, msg, status)

}

k/main.go


package main


import "net/http"


func main() {

    y := make([]byte, 100)

    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        w.WriteHeader(http.StatusOK)

        w.Write(y)

    })

    http.ListenAndServe(":7000", nil)

}

安装额外的可视化工具,并用来ab模拟一些负载,它就可以完成直观的演示。


go get -u github.com/divan/expvarmon

go run main.go &

go run k/main.go &

ab -n 50000 -c 2500 http://localhost:8080/y

# in a different window, for live preview

expvarmon -ports=6060 -i 500ms

此时您会读取 的输出expvarmon,如果它是实时的,您会看到类似的内容

https://img1.sycdn.imooc.com//64d9e6de00018d3106520402.jpg

你可以看到那些东西在挥舞,GC 正在积极工作。

应用程序已加载,内存正在消耗,等待服务器释放其 conn 并等待 gc 清理它们

https://img1.sycdn.imooc.com//64d9e6ec0001faeb06520244.jpg

您可以看到memstats.Allocmemstats.HeapAlloc,memstats.HeapInuse现在减少了,正如 GC 完成其工作时所预期的那样,并且不存在泄漏。

如果你要在运行go tool pprof -inuse_space -web http://localhost:6060/debug/pprof/heap后立即检查ab

https://img1.sycdn.imooc.com//64d9e7020001618725601236.jpg

它表明该应用程序正在使用177Mb内存。


其中大部分102Mb正在被使用net/http.Transport.getConn。


你的经纪人正在负责1Mb,剩下的就是需要做的各种事情。


如果您在服务器和 gc 发布后截取屏幕截图,您会看到一个更小的图表。这里就不演示了。


现在让我们生成一个泄漏并再次使用这两个工具查看它。


在代码中取消注释,



func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {


    go func() {

        make(chan []byte) <- make([]byte, 10024)

    }()

//...

重新启动应用程序(按q,expvarmon但不是必需的)


go get -u github.com/divan/expvarmon

go run main.go &

go run k/main.go &

ab -n 50000 -c 2500 http://localhost:8080/y

# in a different window, for live preview

expvarmon -ports=6060 -i 500ms

表明

https://img1.sycdn.imooc.com//64d9e7150001104609060339.jpg

https://img1.sycdn.imooc.com//64d9e71e000125dd09040360.jpg

expvarmon可以看到相同的行为,只有数字发生了变化,并且在静止状态下,在被 gced 后,它仍然消耗大量内存,比作为比较点的 void golang http 服务器要多得多。

再次,截取堆的屏幕截图,它显示您的处理程序现在正在消耗大部分内存 ~ 450Mb,注意箭头,它显示有 for452mb分配10kb4.50Mbof 96b。它们分别对应于[]byte被推送到的切片chan []byte

https://img1.sycdn.imooc.com//64d9e72c0001294b25600941.jpg

最后,您可以检查堆栈跟踪以查找死 goroutine,从而泄漏内存,打开http://localhost:6060/debug/pprof/goroutine?debug=1


goroutine profile: total 50012


50000 @ 0x43098f 0x4077fa 0x4077d0 0x4074bb 0x76b85d 0x45d281

#   0x76b85c    main.prepareRequest.func1+0x4c  /home/mh-cbon/gow/src/test/oom/main.go:101


4 @ 0x43098f 0x42c09a 0x42b686 0x4c3a3b 0x4c484b 0x4c482c 0x57d94f 0x590d79 0x6b4c67 0x5397cf 0x53a51d 0x53a754 0x6419ef 0x6af18d 0x6af17f 0x6b5f33 0x6ba4fd 0x45d281

#   0x42b685    internal/poll.runtime_pollWait+0x55     /home/mh-cbon/.gvm/gos/go1.12.7/src/runtime/netpoll.go:182

#   0x4c3a3a    internal/poll.(*pollDesc).wait+0x9a     /home/mh-cbon/.gvm/gos/go1.12.7/src/internal/poll/fd_poll_runtime.go:87

// more...

它告诉我们程序正在托管goroutine,然后它按文件位置分组列出它们,其中第一个数字是本示例第一组中50 012正在运行的实例的计数。50 000接下来是导致 goroutine 存在的堆栈跟踪。

您可以看到有很多系统问题,就您的情况而言,您不必太担心。

您必须寻找那些您认为如果您的程序按您认为应该的方式运行则不应运行的程序。

然而,总体而言,您的代码并不令人满意,并且可以并且可能应该通过对其分配和总体设计概念进行彻底审查来改进。

** 这是对原始源代码所做的更改的摘要。

  • 它添加了一个新程序k/main.go来充当后端服务器。

  • 它添加了_ "expvar"导入语句

  • init它启动 pprof 在阶段期间注册的 std api HTTP 服务器实例go http.ListenAndServe("localhost:6060", nil)

  • 禁用客户端超时Timeout:   time.Duration(300) * time.Millisecond,,否则负载测试不会返回200s

  • 服务器地址设置为Addr: "localhost:8080",

  • urls其中创建的值设置prepareRequest为 len=2 的静态列表

  • req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(它添加了对{"body":"param"}的错误检查)))

  • 它禁用错误检查io.Copy(ioutil.Discard, response.Body)


查看完整回答
反对 回复 2023-08-14
?
largeQ

TA贡献2039条经验 获得超7个赞

我已经通过用 替换net/httppackage解决了这个问题fasthttp。早些时候我没有使用它,因为我无法在 fasthttp 客户端上找到超时方法,但我发现DoTimeoutfasthttp 客户端确实有一种方法,可以在指定的持续时间后使请求超时。

这里是更新的代码:

vars.go中 ClientFastHttp *fasthttp.Client

主程序

package main


import (

    "./common"

    "crypto/tls"

    "fmt"

    "github.com/gorilla/mux"

    "github.com/valyala/fasthttp"

    "log"

    "math/rand"

    "net"

    "net/http"

    "net/http/pprof"

    "os"

    "runtime"

    "strconv"

    "sync"

    "time"

)


func init() {


    //Get Any command line argument passed

    args := os.Args[1:]

    numCPU := runtime.NumCPU()

    if len(args) > 1 {

        numCPU, _ = strconv.Atoi(args[0])

    }


    common.Transport = &http.Transport{

        TLSClientConfig: &tls.Config{

            InsecureSkipVerify: true,

        },

        DialContext: (&net.Dialer{

            //Timeout: time.Duration() * time.Millisecond,

            KeepAlive: 30 * time.Second,

        }).DialContext,

        //ForceAttemptHTTP2:     true,

        DisableKeepAlives: false,

        //MaxIdleConns:      0,

        //IdleConnTimeout:   0,

        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,

        //ExpectContinueTimeout: 1 * time.Second,

    }


    common.Client = &http.Client{

        Timeout:   time.Duration(300) * time.Millisecond,

        Transport: common.Transport,

    }


    runtime.GOMAXPROCS(numCPU)


    rand.Seed(time.Now().UTC().UnixNano())

}


func main() {


    router := mux.NewRouter().StrictSlash(true)

    router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        _, _ = fmt.Fprintf(w, "Hello!!!")

    })


    router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {

        vars := mux.Vars(r)


        prepareRequest(w, r, vars["name"])


    }).Methods("POST")


    // Register pprof handlers

    router.HandleFunc("/debug/pprof/", pprof.Index)

    router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

    router.HandleFunc("/debug/pprof/profile", pprof.Profile)

    router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)

    router.HandleFunc("/debug/pprof/trace", pprof.Trace)


    routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")


    srv := &http.Server{

        Addr: "0.0.0.0:" + "80",

        /*ReadTimeout:  500 * time.Millisecond,

        WriteTimeout: 500 * time.Millisecond,

        IdleTimeout:  10 * time.Second,*/

        Handler: routerMiddleWare,

    }


    log.Fatal(srv.ListenAndServe())

}


func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {


    //other part of the code and call to goroutine

    var urls []string

    results, s, c := callUrls(urls)

    finalCall(w, results, s, c)


}


type Response struct {

    Status int

    Url    string

    Body   string

}


func callUrls(urls []string) ([]*Response, []string, []string) {

    var wg sync.WaitGroup

    wg.Add(len(urls))

    ch := make(chan func() (*Response, string, string), len(urls))

    for _, url := range urls {

        go func(url string) {

            //decide if request is valid for client to make http call using country/os

            isValid := true //assuming url to be called

            if isValid {

                //make post call

                //request body have many more paramter, just sample included.

                //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.

                req := fasthttp.AcquireRequest()

                req.SetRequestURI(url)

                req.Header.Set("Content-Type", "application/json")

                req.Header.Set("Connection", "Keep-Alive")

                req.Header.SetMethod("POST")

                req.SetBody([]byte(`{"body":"param"}`))


                resp := fasthttp.AcquireResponse()


                defer fasthttp.ReleaseRequest(req)   // <- do not forget to release

                defer fasthttp.ReleaseResponse(resp) // <- do not forget to release


                //err := clientFastHttp.Do(req, response)

                //endregion

                t := time.Duration(300)


                err := common.ClientFastHttp.DoTimeout(req, resp, t*time.Millisecond)


                body := resp.Body()


                if err != nil {

                    wg.Done()

                    ch <- func() (*Response, string, string) {

                        return &Response{Status: 500, Url: url, Body: ""}, "error", "500"

                    }

                    return

                }


                /*defer response.Body.Close()

                body, _ := ioutil.ReadAll(response.Body)

                _, err = io.Copy(ioutil.Discard, response.Body)


                //Close the body, forced this

                //Also tried without defer, and only wothout following line

                response.Body.Close()*/


                //do something with response body replace a few string etc.

                //and return

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"

                }


            } else {

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"

                }

            }


        }(url)

    }

    wg.Wait()

    var (

        results []*Response

        msg     []string

        status  []string

    )

    for {

        r, x, y := (<-ch)()

        if r != nil {


            results = append(results, r)

            msg = append(msg, x)

            status = append(status, y)

        }

        if len(results) == len(urls) {

            return results, msg, status

        }


    }

}


func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {

    fmt.Println("response", "response body", results, msg, status)

}


查看完整回答
反对 回复 2023-08-14
  • 2 回答
  • 0 关注
  • 171 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信