3 回答
TA贡献1859条经验 获得超6个赞
你看错了书。本书使用该示例来说明如何使用缓冲通道来避免戈鲁丁泄漏。
这是紧跟在书中的例子之后的段落(第233页):
如果我们使用无缓冲的通道,两个较慢的戈鲁丁就会被困在一个没有戈鲁廷会接收到的频道上发送他们的响应。这种情况,称为戈鲁丁泄漏,将是一个错误。与垃圾变量不同,泄漏的戈鲁廷不会自动收集,因此确保戈鲁廷在不再需要时自行终止非常重要。
注意:
此函数不会尝试针对内存占用或资源使用(包括网络资源)进行优化。Go的软件包的客户端函数是上下文感知的,因此它可以在请求中途取消,这将节省一些资源(这是否对问题很重要将是设计决策)。
net/http
要使用上下文,您可以:
func mirroredQuery() string {
responses := make(chan string, 3)
ctx, cf := context.WithCancel(context.Background())
defer cf()
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(ctx context.Context, url string) string {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
panic(err)
}
res, err := http.DefaultClient.Do(req)
if err == nil {
body, err := io.ReadAll(res.Body)
if err == nil {
return string(body)
} else {
return err.Error()
}
} else {
return err.Error()
}
}
使用缓冲通道可分配内存。当戈鲁廷太多时,使用缓冲通道太浪费了。
要解决此问题,您可以使用通道(如您尝试的通道):
func getAny() string {
responses := make(chan string)
ctx, cf := context.WithCancel(context.Background())
defer cf()
done := make(chan struct{})
defer close(done)
doRequest := func(url string) {
select {
case responses <- request(ctx, url):
fmt.Printf("get %s\n", url)
case <-done:
fmt.Printf("stop %s\n", url)
return
}
}
go doRequest("http://google.com")
go doRequest("http://qq.com")
go doRequest("http://baidu.com")
return <-responses // return the quickest response
}
在封闭频道上接收总是立即“返回”零值,因此充当广播。通常的做法是使用这种“完成通道”。
您还可以使用 :context.Context
func mirroredQuery() string {
responses := make(chan string)
ctx, cf := context.WithCancel(context.Background())
defer cf()
doRequest := func(url string) {
select {
case responses <- request(ctx, url):
fmt.Printf("get %s\n", url)
case <-ctx.Done():
fmt.Printf("stop %s\n", url)
return
}
}
go doRequest("http://google.com")
go doRequest("http://qq.com")
go doRequest("http://baidu.com")
return <-responses // return the quickest response
}
在这种情况下,这更好,因为您已经将 与 http 一起使用。context.Context
使用将等待所有请求完成,但返回第一个请求。我认为这违背了函数的目的,并且几乎没有提供任何好处。而且我不认为在函数本身返回之前使所有生成的函数都返回是有意义的(除非该函数是主函数)。sync.WorkGroup
TA贡献1776条经验 获得超12个赞
为了避免泄露戈鲁廷,您可能希望确保一旦从 返回,没有最初在此函数中创建的戈鲁廷保持运行状态?mirroredQuery
在这种情况下,最重要的是当其他戈鲁廷之一设法成功执行请求时,能够取消它们。这种取消是在 Go 中使用实现的,它支持。context.Contextnet/http
一旦你有了上下文取消,你需要在主函数中有一个 来等待所有的戈鲁廷都是 。sync.WaitGroupDone
下面是一个使用上下文并包装本书函数的“HTTP 获取”功能:doRequestrequest
func doRequest(ctx context.Context, url string) string {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
log.Fatal(err)
}
res, err := http.DefaultClient.Do(req)
// err will be non-nil also if the request was canceled
if err != nil {
return ""
}
defer res.Body.Close()
b, err := io.ReadAll(res.Body)
if err != nil {
return ""
}
return string(b)
}
http.DefaultClient.Do如果上下文被取消,将提前返回,并出现相应的错误。
现在,处理戈鲁丁的函数变为:
func mirroredQuery() string {
ctx, cancel := context.WithCancel(context.Background())
responses := make(chan string, 3)
fetcher := func(url string, wg *sync.WaitGroup) {
res := doRequest(ctx, url)
if res != "" {
responses <- res
}
wg.Done()
}
urls := []string{
"asia.gopl.io",
"europe.gopl.io",
"http://google.com",
}
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go fetcher(url, &wg)
}
res := <-responses
fmt.Println("got response", res[:300])
cancel()
wg.Wait()
return res
}
请注意以下几点:
每个 goroutine 都会运行,并且仅当结果为非空时才写入结果(这意味着没有发生错误;取消在此处计为错误)
doRequest
responses
用于等待所有工人戈鲁丁退出
WaitGroup
主戈鲁廷启动所有工人,然后等待第一个(非空)结果;然后,它调用取消上下文,该上下文指示所有工作线程退出,并等待它们完成。
responses
cancel
作为练习,扩展此代码以解决以下几个问题:
区分实际错误和取消;在当前代码中,如果所有工作线程都遇到错误,则可能存在死锁
使用 向主 goroutine 中的读取添加超时。
<- responses
select
编写代码以尽快将第一个结果返回给调用方,而后台 goroutine 可以处理取消上下文并等待工作线程退出。毕竟,这里的主要目标是快速返回结果。
TA贡献1842条经验 获得超21个赞
在提供的代码中没有戈鲁丁泄漏。该方法使用缓冲通道来收集结果并返回第一个答案。当前缓冲区有足够的空间来收集所有戈鲁丁的所有答案,即使从未读取其余响应也是如此。如果缓冲区小于 N - 1,则情况将发生变化,其中 N 是生成的戈鲁丁数。在这种情况下,生成的一些戈鲁丁将卡在尝试向通道发送响应时。重复调用将导致卡住的戈鲁廷增加,这可称为戈鲁丁泄漏。mirroredQuerymirroredQueryresponsesmirroredQuery
下面是包含已添加日志的代码以及两种方案的输出。
func mirroredQuery() string {
responses := make(chan string, 2)
go func() {
responses <- request("asia.gopl.io")
log.Printf("Finished goroutine asia.gopl.io\n")
}()
go func() {
responses <- request("europe.gopl.io")
log.Printf("Finished goroutine europe.gopl.io\n")
}()
go func() {
responses <- request("americas.gopl.io")
log.Printf("Finished goroutine americas.gopl.io\n")
}()
return <-responses // return the quickest response
}
func request(hostname string) (response string) {
duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
time.Sleep(duration)
return hostname
}
func main() {
rand.Seed(time.Now().UnixNano())
result := mirroredQuery()
log.Printf("Fastest result for %s\n", result)
time.Sleep(6*time.Second)
}
缓冲区大小 > = N-1 的输出
2021/06/26 16:05:27 Finished europe.gopl.io
2021/06/26 16:05:27 Fastest result for europe.gopl.io
2021/06/26 16:05:28 Finished asia.gopl.io
2021/06/26 16:05:30 Finished americas.gopl.io
Process finished with the exit code 0
缓冲区大小为 N-1 <输出
2021/06/26 15:47:54 Finished europe.gopl.io
2021/06/26 15:47:54 Fastest result for europe.gopl.io
Process finished with the exit code 0
上述实现可以通过在第一个响应到达时引入 goroutine 终止来“改进”。这可能会减少已用资源的数量。这在很大程度上取决于什么方法。对于计算繁重的场景,取消http请求可能会导致连接终止,因此下一个请求必须打开新的请求。对于高负载服务器,即使不使用响应,它也可能不如等待响应有效。request
下面是改进的用法实现。context
func mirroredQuery() string {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
responses := make(chan string)
f := func(hostname string) {
response, err := request(ctx, hostname)
if err != nil {
log.Printf("Finished %s with error %s\n", hostname, err)
return
}
responses <- response
log.Printf("Finished %s\n", hostname)
}
go f("asia.gopl.io")
go f("europe.gopl.io")
go f("americas.gopl.io")
return <-responses // return the quickest response
}
func request(ctx context.Context, hostname string) (string, error) {
duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
after := time.After(duration)
select {
case <-ctx.Done():
return "", ctx.Err()
case <-after:
return "response for "+hostname, nil
}
}
func main() {
rand.Seed(time.Now().UnixNano())
result := mirroredQuery()
log.Printf("Fastest result for %s\n", result)
time.Sleep(6 * time.Second)
}
- 3 回答
- 0 关注
- 85 浏览
添加回答
举报