2 回答
TA贡献1744条经验 获得超4个赞
这段代码没有泄露。
为了演示,让我们稍微更新一下它,以便该帖子可以重现。
主程序
package main
import (
"bytes"
"crypto/tls"
_ "expvar"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"strconv"
"sync"
"time"
"github.com/gorilla/mux"
)
var (
//http client
Client *http.Client
//http Transport
Transport *http.Transport
)
func init() {
go http.ListenAndServe("localhost:6060", nil)
//Get Any command line argument passed
args := os.Args[1:]
numCPU := runtime.NumCPU()
if len(args) > 1 {
numCPU, _ = strconv.Atoi(args[0])
}
Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
DialContext: (&net.Dialer{
//Timeout: time.Duration() * time.Millisecond,
KeepAlive: 30 * time.Second,
}).DialContext,
//ForceAttemptHTTP2: true,
DisableKeepAlives: false,
//MaxIdleConns: 0,
//IdleConnTimeout: 0,
//TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
//ExpectContinueTimeout: 1 * time.Second,
}
Client = &http.Client{
// Timeout: time.Duration(300) * time.Millisecond,
Transport: Transport,
}
runtime.GOMAXPROCS(numCPU)
rand.Seed(time.Now().UTC().UnixNano())
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprintf(w, "Hello!!!")
})
router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
prepareRequest(w, r, vars["name"])
}).Methods("POST", "GET")
// Register pprof handlers
// router.HandleFunc("/debug/pprof/", pprof.Index)
// router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
// router.HandleFunc("/debug/pprof/profile", pprof.Profile)
// router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
// router.HandleFunc("/debug/pprof/trace", pprof.Trace)
routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")
srv := &http.Server{
Addr: "localhost:8080",
/*ReadTimeout: 500 * time.Millisecond,
WriteTimeout: 500 * time.Millisecond,
IdleTimeout: 10 * time.Second,*/
Handler: routerMiddleWare,
}
log.Fatal(srv.ListenAndServe())
}
func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {
// go func() {
// make(chan []byte) <- make([]byte, 10024)
// }()
//other part of the code and call to goroutine
var urls []string
urls = append(urls,
"http://localhost:7000/",
"http://localhost:7000/",
)
results, s, c := callUrls(urls)
finalCall(w, results, s, c)
}
type Response struct {
Status int
Url string
Body string
}
func callUrls(urls []string) ([]*Response, []string, []string) {
var wg sync.WaitGroup
wg.Add(len(urls))
ch := make(chan func() (*Response, string, string), len(urls))
for _, url := range urls {
go func(url string) {
//decide if request is valid for client to make http call using country/os
isValid := true //assuming url to be called
if isValid {
//make post call
//request body have many more paramter, just sample included.
//instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.
req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`)))
if err != nil {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"
}
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "Keep-Alive")
//req.Close = true
response, err := Client.Do(req)
if err != nil {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"
}
return
}
defer response.Body.Close()
body, _ := ioutil.ReadAll(response.Body)
io.Copy(ioutil.Discard, response.Body)
//Close the body, forced this
//Also tried without defer, and only wothout following line
response.Body.Close()
//do something with response body replace a few string etc.
//and return
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"
}
} else {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"
}
}
}(url)
}
wg.Wait()
var (
results []*Response
msg []string
status []string
)
for {
r, x, y := (<-ch)()
if r != nil {
results = append(results, r)
msg = append(msg, x)
status = append(status, y)
}
if len(results) == len(urls) {
return results, msg, status
}
}
}
func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {
fmt.Println("response", "response body", results, msg, status)
}
k/main.go
package main
import "net/http"
func main() {
y := make([]byte, 100)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(y)
})
http.ListenAndServe(":7000", nil)
}
安装额外的可视化工具,并用来ab模拟一些负载,它就可以完成直观的演示。
go get -u github.com/divan/expvarmon
go run main.go &
go run k/main.go &
ab -n 50000 -c 2500 http://localhost:8080/y
# in a different window, for live preview
expvarmon -ports=6060 -i 500ms
此时您会读取 的输出expvarmon,如果它是实时的,您会看到类似的内容
你可以看到那些东西在挥舞,GC 正在积极工作。
应用程序已加载,内存正在消耗,等待服务器释放其 conn 并等待 gc 清理它们
您可以看到memstats.Alloc
, memstats.HeapAlloc
,memstats.HeapInuse
现在减少了,正如 GC 完成其工作时所预期的那样,并且不存在泄漏。
如果你要在运行go tool pprof -inuse_space -web http://localhost:6060/debug/pprof/heap
后立即检查ab
它表明该应用程序正在使用177Mb内存。
其中大部分102Mb正在被使用net/http.Transport.getConn。
你的经纪人正在负责1Mb,剩下的就是需要做的各种事情。
如果您在服务器和 gc 发布后截取屏幕截图,您会看到一个更小的图表。这里就不演示了。
现在让我们生成一个泄漏并再次使用这两个工具查看它。
在代码中取消注释,
func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {
go func() {
make(chan []byte) <- make([]byte, 10024)
}()
//...
重新启动应用程序(按q,expvarmon但不是必需的)
go get -u github.com/divan/expvarmon
go run main.go &
go run k/main.go &
ab -n 50000 -c 2500 http://localhost:8080/y
# in a different window, for live preview
expvarmon -ports=6060 -i 500ms
表明
您expvarmon
可以看到相同的行为,只有数字发生了变化,并且在静止状态下,在被 gced 后,它仍然消耗大量内存,比作为比较点的 void golang http 服务器要多得多。
再次,截取堆的屏幕截图,它显示您的处理程序现在正在消耗大部分内存 ~ 450Mb
,注意箭头,它显示有 for452mb
分配10kb
和4.50Mb
of 96b
。它们分别对应于[]byte
被推送到的切片chan []byte
。
最后,您可以检查堆栈跟踪以查找死 goroutine,从而泄漏内存,打开http://localhost:6060/debug/pprof/goroutine?debug=1
goroutine profile: total 50012
50000 @ 0x43098f 0x4077fa 0x4077d0 0x4074bb 0x76b85d 0x45d281
# 0x76b85c main.prepareRequest.func1+0x4c /home/mh-cbon/gow/src/test/oom/main.go:101
4 @ 0x43098f 0x42c09a 0x42b686 0x4c3a3b 0x4c484b 0x4c482c 0x57d94f 0x590d79 0x6b4c67 0x5397cf 0x53a51d 0x53a754 0x6419ef 0x6af18d 0x6af17f 0x6b5f33 0x6ba4fd 0x45d281
# 0x42b685 internal/poll.runtime_pollWait+0x55 /home/mh-cbon/.gvm/gos/go1.12.7/src/runtime/netpoll.go:182
# 0x4c3a3a internal/poll.(*pollDesc).wait+0x9a /home/mh-cbon/.gvm/gos/go1.12.7/src/internal/poll/fd_poll_runtime.go:87
// more...
它告诉我们程序正在托管goroutine,然后它按文件位置分组列出它们,其中第一个数字是本示例第一组中50 012
正在运行的实例的计数。50 000
接下来是导致 goroutine 存在的堆栈跟踪。
您可以看到有很多系统问题,就您的情况而言,您不必太担心。
您必须寻找那些您认为如果您的程序按您认为应该的方式运行则不应运行的程序。
然而,总体而言,您的代码并不令人满意,并且可以并且可能应该通过对其分配和总体设计概念进行彻底审查来改进。
** 这是对原始源代码所做的更改的摘要。
它添加了一个新程序
k/main.go
来充当后端服务器。它添加了
_ "expvar"
导入语句init
它启动 pprof 在阶段期间注册的 std api HTTP 服务器实例go http.ListenAndServe("localhost:6060", nil)
禁用客户端超时
Timeout: time.Duration(300) * time.Millisecond,
,否则负载测试不会返回200s服务器地址设置为
Addr: "localhost:8080",
urls
其中创建的值设置prepareRequest
为 len=2 的静态列表req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(
它添加了对{"body":"param"}的错误检查)))
它禁用错误检查
io.Copy(ioutil.Discard, response.Body)
TA贡献2039条经验 获得超7个赞
我已经通过用 替换net/http
package解决了这个问题fasthttp
。早些时候我没有使用它,因为我无法在 fasthttp 客户端上找到超时方法,但我发现DoTimeout
fasthttp 客户端确实有一种方法,可以在指定的持续时间后使请求超时。
这里是更新的代码:
在vars.go中 ClientFastHttp *fasthttp.Client
主程序
package main
import (
"./common"
"crypto/tls"
"fmt"
"github.com/gorilla/mux"
"github.com/valyala/fasthttp"
"log"
"math/rand"
"net"
"net/http"
"net/http/pprof"
"os"
"runtime"
"strconv"
"sync"
"time"
)
func init() {
//Get Any command line argument passed
args := os.Args[1:]
numCPU := runtime.NumCPU()
if len(args) > 1 {
numCPU, _ = strconv.Atoi(args[0])
}
common.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
DialContext: (&net.Dialer{
//Timeout: time.Duration() * time.Millisecond,
KeepAlive: 30 * time.Second,
}).DialContext,
//ForceAttemptHTTP2: true,
DisableKeepAlives: false,
//MaxIdleConns: 0,
//IdleConnTimeout: 0,
//TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
//ExpectContinueTimeout: 1 * time.Second,
}
common.Client = &http.Client{
Timeout: time.Duration(300) * time.Millisecond,
Transport: common.Transport,
}
runtime.GOMAXPROCS(numCPU)
rand.Seed(time.Now().UTC().UnixNano())
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprintf(w, "Hello!!!")
})
router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
prepareRequest(w, r, vars["name"])
}).Methods("POST")
// Register pprof handlers
router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)
routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")
srv := &http.Server{
Addr: "0.0.0.0:" + "80",
/*ReadTimeout: 500 * time.Millisecond,
WriteTimeout: 500 * time.Millisecond,
IdleTimeout: 10 * time.Second,*/
Handler: routerMiddleWare,
}
log.Fatal(srv.ListenAndServe())
}
func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {
//other part of the code and call to goroutine
var urls []string
results, s, c := callUrls(urls)
finalCall(w, results, s, c)
}
type Response struct {
Status int
Url string
Body string
}
func callUrls(urls []string) ([]*Response, []string, []string) {
var wg sync.WaitGroup
wg.Add(len(urls))
ch := make(chan func() (*Response, string, string), len(urls))
for _, url := range urls {
go func(url string) {
//decide if request is valid for client to make http call using country/os
isValid := true //assuming url to be called
if isValid {
//make post call
//request body have many more paramter, just sample included.
//instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.
req := fasthttp.AcquireRequest()
req.SetRequestURI(url)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "Keep-Alive")
req.Header.SetMethod("POST")
req.SetBody([]byte(`{"body":"param"}`))
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req) // <- do not forget to release
defer fasthttp.ReleaseResponse(resp) // <- do not forget to release
//err := clientFastHttp.Do(req, response)
//endregion
t := time.Duration(300)
err := common.ClientFastHttp.DoTimeout(req, resp, t*time.Millisecond)
body := resp.Body()
if err != nil {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "error", "500"
}
return
}
/*defer response.Body.Close()
body, _ := ioutil.ReadAll(response.Body)
_, err = io.Copy(ioutil.Discard, response.Body)
//Close the body, forced this
//Also tried without defer, and only wothout following line
response.Body.Close()*/
//do something with response body replace a few string etc.
//and return
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"
}
} else {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"
}
}
}(url)
}
wg.Wait()
var (
results []*Response
msg []string
status []string
)
for {
r, x, y := (<-ch)()
if r != nil {
results = append(results, r)
msg = append(msg, x)
status = append(status, y)
}
if len(results) == len(urls) {
return results, msg, status
}
}
}
func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {
fmt.Println("response", "response body", results, msg, status)
}
- 2 回答
- 0 关注
- 171 浏览
添加回答
举报