为了账号安全,请及时绑定邮箱和手机立即绑定

Go net/http 在高负载下泄漏内存

Go net/http 在高负载下泄漏内存

Go
GCT1015 2022-04-26 15:03:38
我正在开发一个使用net/http包调用客户端 URL 的 API。根据用户国家/地区,goroutine 中的每个请求(POST 调用)同时调用 1 到 8 个 URL。该应用程序适用于大约 1000-1500 个请求的低 qps,但是将应用程序扩展到 3k 个请求时,即使仅调用 1 个客户端 URL,应用程序也会在几分钟后停止响应(响应时间远高于 50 秒),内存会突然增加)。我正在使用 Go 本机net/http包和gorilla/mux路由器。关于这个问题的其他问题说要关闭响应主体,但我已经使用        req, err := http.NewRequest("POST", "client_post_url", bytes.NewBuffer(requestBody))        req.Header.Set("Content-Type", "application/json")        req.Header.Set("Connection", "Keep-Alive")        response, err := common.Client.Do(req)        status := 0        if err != nil {//handle and return}        defer response.Body.Close() //used with/without io.Copy        status = response.StatusCode        body, _ := ioutil.ReadAll(response.Body)        _, err = io.Copy(ioutil.Discard, response.Body)我需要重用连接,因此我在 init 方法中创建了 http 客户端和传输全局变量,就像这样。    common.Transport = &http.Transport{        TLSClientConfig: &tls.Config{            InsecureSkipVerify: true,        },        DialContext: (&net.Dialer{            //Timeout: time.Duration(300) * time.Millisecond,            KeepAlive: 30 * time.Second,        }).DialContext,        //ForceAttemptHTTP2:     true,        DisableKeepAlives: false,        //MaxIdleConns:      0,        //IdleConnTimeout:   0,        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,        //ExpectContinueTimeout: 1 * time.Second,    }    common.Client = &http.Client{        Timeout:   time.Duration(300) * time.Millisecond,        Transport: common.Transport,    }我已经读过使用保持活动会导致内存泄漏,我尝试了一些组合来根据请求禁用保持活动/关闭请求标志。但似乎没有任何效果。此外,如果我不进行任何 http 调用并time.Sleep(300 * time.Millisecond)在 goroutine 中使用同时调用每个 url,则应用程序确实可以正常工作而没有任何泄漏。所以我确信它与客户端/http包有关,在高负载连接下没有释放或没有正确使用。我应该采取什么方法来实现这一目标?是否创建自定义服务器和自定义处理程序类型来接受请求和路由请求将如几篇文章中的 C10K 方法中所述那样工作?如果需要,我可以与所有详细信息共享示例代码。上面刚刚补充说我觉得问题所在的部分。
查看完整描述

2 回答

?
慕标5832272

TA贡献1966条经验 获得超4个赞

此代码没有泄漏。


为了演示,让我们稍微更新一下**,这样帖子就可以重现了。


main.go


package main


import (

    "bytes"

    "crypto/tls"

    _ "expvar"

    "fmt"

    "io"

    "io/ioutil"

    "log"

    "math/rand"

    "net"

    "net/http"

    _ "net/http/pprof"

    "os"

    "runtime"

    "strconv"

    "sync"

    "time"


    "github.com/gorilla/mux"

)


var (

    //http client

    Client *http.Client


    //http Transport

    Transport *http.Transport

)



func init() {


    go http.ListenAndServe("localhost:6060", nil)


    //Get Any command line argument passed

    args := os.Args[1:]

    numCPU := runtime.NumCPU()

    if len(args) > 1 {

        numCPU, _ = strconv.Atoi(args[0])

    }


    Transport = &http.Transport{

        TLSClientConfig: &tls.Config{

            InsecureSkipVerify: true,

        },

        DialContext: (&net.Dialer{

            //Timeout: time.Duration() * time.Millisecond,

            KeepAlive: 30 * time.Second,

        }).DialContext,

        //ForceAttemptHTTP2:     true,

        DisableKeepAlives: false,

        //MaxIdleConns:      0,

        //IdleConnTimeout:   0,

        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,

        //ExpectContinueTimeout: 1 * time.Second,

    }


    Client = &http.Client{

        // Timeout:   time.Duration(300) * time.Millisecond,

        Transport: Transport,

    }


    runtime.GOMAXPROCS(numCPU)


    rand.Seed(time.Now().UTC().UnixNano())

}


func main() {


    router := mux.NewRouter().StrictSlash(true)

    router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        _, _ = fmt.Fprintf(w, "Hello!!!")

    })


    router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {

        vars := mux.Vars(r)


        prepareRequest(w, r, vars["name"])


    }).Methods("POST", "GET")


    // Register pprof handlers

    // router.HandleFunc("/debug/pprof/", pprof.Index)

    // router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

    // router.HandleFunc("/debug/pprof/profile", pprof.Profile)

    // router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)

    // router.HandleFunc("/debug/pprof/trace", pprof.Trace)


    routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")


    srv := &http.Server{

        Addr: "localhost:8080",

        /*ReadTimeout:  500 * time.Millisecond,

          WriteTimeout: 500 * time.Millisecond,

          IdleTimeout:  10 * time.Second,*/

        Handler: routerMiddleWare,

    }


    log.Fatal(srv.ListenAndServe())

}


func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {


    // go func() {

    //  make(chan []byte) <- make([]byte, 10024)

    // }()


    //other part of the code and call to goroutine

    var urls []string

    urls = append(urls,

        "http://localhost:7000/",

        "http://localhost:7000/",

    )

    results, s, c := callUrls(urls)

    finalCall(w, results, s, c)


}


type Response struct {

    Status int

    Url    string

    Body   string

}


func callUrls(urls []string) ([]*Response, []string, []string) {

    var wg sync.WaitGroup

    wg.Add(len(urls))

    ch := make(chan func() (*Response, string, string), len(urls))

    for _, url := range urls {

        go func(url string) {

            //decide if request is valid for client to make http call using country/os

            isValid := true //assuming url to be called

            if isValid {

                //make post call

                //request body have many more paramter, just sample included.

                //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.

                req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`)))

                if err != nil {

                    wg.Done()

                    ch <- func() (*Response, string, string) {

                        return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"

                    }

                    return

                }

                req.Header.Set("Content-Type", "application/json")

                req.Header.Set("Connection", "Keep-Alive")

                //req.Close = true


                response, err := Client.Do(req)


                if err != nil {

                    wg.Done()

                    ch <- func() (*Response, string, string) {

                        return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"

                    }

                    return

                }


                defer response.Body.Close()

                body, _ := ioutil.ReadAll(response.Body)

                io.Copy(ioutil.Discard, response.Body)


                //Close the body, forced this

                //Also tried without defer, and only wothout following line

                response.Body.Close()


                //do something with response body replace a few string etc.

                //and return

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"

                }


            } else {

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"

                }

            }


        }(url)

    }

    wg.Wait()

    var (

        results []*Response

        msg     []string

        status  []string

    )

    for {

        r, x, y := (<-ch)()

        if r != nil {


            results = append(results, r)

            msg = append(msg, x)

            status = append(status, y)

        }

        if len(results) == len(urls) {

            return results, msg, status

        }


    }

}


func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {

    fmt.Println("response", "response body", results, msg, status)

}

k/main.go


package main


import "net/http"


func main() {

    y := make([]byte, 100)

    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        w.WriteHeader(http.StatusOK)

        w.Write(y)

    })

    http.ListenAndServe(":7000", nil)

}

安装额外的可视化工具,并用于ab模拟一些负载,它将完成直观演示的工作。


go get -u github.com/divan/expvarmon

go run main.go &

go run k/main.go &

ab -n 50000 -c 2500 http://localhost:8080/y

# in a different window, for live preview

expvarmon -ports=6060 -i 500ms

那时你阅读了 的输出expvarmon,如果它是现场的,你有类似的东西

//img1.sycdn.imooc.com//62679cf300013c8e08990508.jpg

你可以看到东西在挥动,gc 正在积极工作。

应用程序已加载,正在消耗内存,等待服务器释放其 conn 和 gc 清理它们

//img1.sycdn.imooc.com//62679d010001927d09000337.jpg

您可以看到memstats.Allocmemstats.HeapAlloc,memstats.HeapInuse现在减少了,正如 gc 完成工作并且不存在泄漏时所预期的那样。

如果你要检查go tool pprof -inuse_space -web http://localhost:6060/debug/pprof/heap,就在ab跑之后

//img1.sycdn.imooc.com//62679d100001006125561233.jpg

它表明该应用程序正在使用177Mb内存。


其中大部分102Mb被net/http.Transport.getConn.


您的处理程序正在计算1Mb,其余的是需要的各种东西。


如果您要在服务器发布和 gc 之后截取屏幕截图,您会看到一个更小的图表。这里不演示。


现在让我们生成一个泄漏并再次使用这两个工具查看它。


在代码中取消注释,



func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {


    go func() {

        make(chan []byte) <- make([]byte, 10024)

    }()

//...

重新启动应用程序(按q,expvarmon虽然不是必需的)


go get -u github.com/divan/expvarmon

go run main.go &

go run k/main.go &

ab -n 50000 -c 2500 http://localhost:8080/y

# in a different window, for live preview

expvarmon -ports=6060 -i 500ms

表明

//img1.sycdn.imooc.com//62679d23000148b809030342.jpg

//img1.sycdn.imooc.com//62679d2b0001029709080364.jpg

expvarmon你可以看到相同的行为,只是数字发生了变化,而在静止状态,它被 gced 后,它仍然消耗了大量的内存,比一个 void golang http server 拿一个比较点要多得多。

同样,对堆进行截图,它显示您的处理程序现在正在消耗大部分内存 ~ 450Mb,注意箭头,它显示有 for 452mbof10kb分配和4.50Mbof 96b。它们分别对应于[]byte被推送到chan []byte.

//img1.sycdn.imooc.com//62679d3c0001b0d225540940.jpg

最后,您可以检查堆栈跟踪以查找死的 goroutine,从而导致内存泄漏,打开http://localhost:6060/debug/pprof/goroutine?debug=1


goroutine profile: total 50012


50000 @ 0x43098f 0x4077fa 0x4077d0 0x4074bb 0x76b85d 0x45d281

#   0x76b85c    main.prepareRequest.func1+0x4c  /home/mh-cbon/gow/src/test/oom/main.go:101


4 @ 0x43098f 0x42c09a 0x42b686 0x4c3a3b 0x4c484b 0x4c482c 0x57d94f 0x590d79 0x6b4c67 0x5397cf 0x53a51d 0x53a754 0x6419ef 0x6af18d 0x6af17f 0x6b5f33 0x6ba4fd 0x45d281

#   0x42b685    internal/poll.runtime_pollWait+0x55     /home/mh-cbon/.gvm/gos/go1.12.7/src/runtime/netpoll.go:182

#   0x4c3a3a    internal/poll.(*pollDesc).wait+0x9a     /home/mh-cbon/.gvm/gos/go1.12.7/src/internal/poll/fd_poll_runtime.go:87

// more...

它告诉我们程序正在托管50 012goroutine,然后它按文件位置分组列出它们,其中第一个数字是运行实例的计数,50 000在本示例的第一组中。紧随其后的是导致 goroutine 存在的堆栈跟踪。

你可以看到有一堆系统的东西,在你的情况下,你不应该太担心它。

如果你的程序按你认为的那样工作,你必须寻找那些你认为不应该存在的人。

但是,总体而言,您的代码并不令人满意,并且可能并且可能应该通过对其分配和整体设计概念的彻底审查来改进。

** 这是对原始源代码所做更改的摘要。

  • 它添加了一个新程序k/main.go来充当后端服务器。

  • 它添加了_ "expvar"导入语句

  • 它启动 pprof 在init阶段期间注册的 std api HTTP 服务器实例go http.ListenAndServe("localhost:6060", nil)

  • 禁用客户端超时Timeout:   time.Duration(300) * time.Millisecond,,否则负载测试不返回 200s

  • 服务器地址设置为Addr: "localhost:8080",

  • urls在其中创建的值prepareRequest设置为 len=2 的静态列表

  • req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(它为{"body":"param"}添加了错误检查)))

  • 它禁用错误检查io.Copy(ioutil.Discard, response.Body)


查看完整回答
反对 回复 2022-04-26
?
精慕HU

TA贡献1845条经验 获得超8个赞

我已经通过将net/http包替换为fasthttp. 早些时候我没有使用它,因为我无法在 fasthttp 客户端上找到超时方法,但我看到确实有一种DoTimeout用于 fasthttp 客户端的方法,它在指定的持续时间后使请求超时。


这里更新的代码:


在vars.go中 ClientFastHttp *fasthttp.Client


main.go


package main


import (

    "./common"

    "crypto/tls"

    "fmt"

    "github.com/gorilla/mux"

    "github.com/valyala/fasthttp"

    "log"

    "math/rand"

    "net"

    "net/http"

    "net/http/pprof"

    "os"

    "runtime"

    "strconv"

    "sync"

    "time"

)


func init() {


    //Get Any command line argument passed

    args := os.Args[1:]

    numCPU := runtime.NumCPU()

    if len(args) > 1 {

        numCPU, _ = strconv.Atoi(args[0])

    }


    common.Transport = &http.Transport{

        TLSClientConfig: &tls.Config{

            InsecureSkipVerify: true,

        },

        DialContext: (&net.Dialer{

            //Timeout: time.Duration() * time.Millisecond,

            KeepAlive: 30 * time.Second,

        }).DialContext,

        //ForceAttemptHTTP2:     true,

        DisableKeepAlives: false,

        //MaxIdleConns:      0,

        //IdleConnTimeout:   0,

        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,

        //ExpectContinueTimeout: 1 * time.Second,

    }


    common.Client = &http.Client{

        Timeout:   time.Duration(300) * time.Millisecond,

        Transport: common.Transport,

    }


    runtime.GOMAXPROCS(numCPU)


    rand.Seed(time.Now().UTC().UnixNano())

}


func main() {


    router := mux.NewRouter().StrictSlash(true)

    router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        _, _ = fmt.Fprintf(w, "Hello!!!")

    })


    router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {

        vars := mux.Vars(r)


        prepareRequest(w, r, vars["name"])


    }).Methods("POST")


    // Register pprof handlers

    router.HandleFunc("/debug/pprof/", pprof.Index)

    router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

    router.HandleFunc("/debug/pprof/profile", pprof.Profile)

    router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)

    router.HandleFunc("/debug/pprof/trace", pprof.Trace)


    routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")


    srv := &http.Server{

        Addr: "0.0.0.0:" + "80",

        /*ReadTimeout:  500 * time.Millisecond,

        WriteTimeout: 500 * time.Millisecond,

        IdleTimeout:  10 * time.Second,*/

        Handler: routerMiddleWare,

    }


    log.Fatal(srv.ListenAndServe())

}


func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {


    //other part of the code and call to goroutine

    var urls []string

    results, s, c := callUrls(urls)

    finalCall(w, results, s, c)


}


type Response struct {

    Status int

    Url    string

    Body   string

}


func callUrls(urls []string) ([]*Response, []string, []string) {

    var wg sync.WaitGroup

    wg.Add(len(urls))

    ch := make(chan func() (*Response, string, string), len(urls))

    for _, url := range urls {

        go func(url string) {

            //decide if request is valid for client to make http call using country/os

            isValid := true //assuming url to be called

            if isValid {

                //make post call

                //request body have many more paramter, just sample included.

                //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.

                req := fasthttp.AcquireRequest()

                req.SetRequestURI(url)

                req.Header.Set("Content-Type", "application/json")

                req.Header.Set("Connection", "Keep-Alive")

                req.Header.SetMethod("POST")

                req.SetBody([]byte(`{"body":"param"}`))


                resp := fasthttp.AcquireResponse()


                defer fasthttp.ReleaseRequest(req)   // <- do not forget to release

                defer fasthttp.ReleaseResponse(resp) // <- do not forget to release


                //err := clientFastHttp.Do(req, response)

                //endregion

                t := time.Duration(300)


                err := common.ClientFastHttp.DoTimeout(req, resp, t*time.Millisecond)


                body := resp.Body()


                if err != nil {

                    wg.Done()

                    ch <- func() (*Response, string, string) {

                        return &Response{Status: 500, Url: url, Body: ""}, "error", "500"

                    }

                    return

                }


                /*defer response.Body.Close()

                body, _ := ioutil.ReadAll(response.Body)

                _, err = io.Copy(ioutil.Discard, response.Body)


                //Close the body, forced this

                //Also tried without defer, and only wothout following line

                response.Body.Close()*/


                //do something with response body replace a few string etc.

                //and return

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"

                }


            } else {

                wg.Done()

                ch <- func() (*Response, string, string) {

                    return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"

                }

            }


        }(url)

    }

    wg.Wait()

    var (

        results []*Response

        msg     []string

        status  []string

    )

    for {

        r, x, y := (<-ch)()

        if r != nil {


            results = append(results, r)

            msg = append(msg, x)

            status = append(status, y)

        }

        if len(results) == len(urls) {

            return results, msg, status

        }


    }

}


func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {

    fmt.Println("response", "response body", results, msg, status)

}


查看完整回答
反对 回复 2022-04-26
  • 2 回答
  • 0 关注
  • 226 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信