2 回答

TA贡献1775条经验 获得超11个赞
您的问题呈现了一个典型的生产者/消费者场景,因为您的 Poll() 函数正在生成由 main() 函数使用的响应数据(可能是在 postgres 中保存数据)。通过使用 go 例程和通道可以很好地解决此问题。
轮询工作可以在一个 goroutine 中完成,该 goroutine 通过通道将响应数据传达给主函数。轮询工作时也可能出现错误(响应错误或 io 错误),因此也应将其传达给 main() 函数。
首先定义一个新类型来保存轮询的数据和一个错误:
type PollResponse struct {
Data []byte
Err error
}
在 Poll() 函数中,启动一个 go 例程来执行轮询工作,并返回一个通道来共享 go 例程之外的数据:
func Poll(req *http.Request, client *http.Client) (ch chan PollResponse){
ch = make(chan PollResponse) // Buffered channel is also good
go func() {
defer func() {
close(ch)
}()
r := rand.New(rand.NewSource(99))
c := time.Tick(10 * time.Second)
for range c {
res, err := client.Do(req);
pollRes := PollResponse {}
if err != nil {
pollRes.Data, pollRes.Err = nil, err
ch <- pollRes
break
}
pollRes.Data, pollRes.Err = io.ReadAll(res.Body)
ch <- pollRes
if pollRes.Err != nil {
break
}
jitter := time.Duration(r.Int31n(5000)) * time.Millisecond
time.Sleep(jitter)
}
}()
return
}
最后在 main() 函数中,调用 Poll() 并读取通道以获得轮询响应:
func main() {
client := &http.Client{
Timeout: time.Second * 60 * 60 * 600,
}
url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return
}
req.Header.Set("Ocp-Apim-Subscription-Key", "xx")
pollCh := Poll(req, client)
for item := range pollCh {
if item.Err == nil {
fmt.Println(string(item.Data)) // or save it to postgres database
}
}
}

TA贡献1757条经验 获得超7个赞
在股票代码通道上的范围。在每次迭代中,获取数据,检查数据是否已更改并处理数据。关键点是从循环内部处理数据,而不是从函数返回数据。
假设您具有以下函数:
// procesChangedData updates the database with new
// data from the API endpoint.
func processChangedData(data []byte) error {
// implement save to postgress
}
使用以下函数进行轮询:
func Poll(client *http.Client) error {
url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"
// Use NewTicker instead of Tick so we can cleanup
// ticker on return from the function.
t := time.NewTicker(10 * time.Second)
defer t.Stop()
var prev []byte
for _ = range t.C {
// Create a new request objet for each request.
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}
req.Header.Set("Ocp-Apim-Subscription-Key", "xx")
resp, err := client.Do(req)
if err != nil {
// Edit error handling to match application
// requirements. I return an error here. Continuing
// the loop is also an option.
return err
}
data, err := io.ReadAll(resp.Body)
// Ensure that body is closed before handling errors
// below.
resp.Body.Close()
if err != nil {
// Edit error handling to match application
// requirements. I return an error here. Continuing
// the loop is also an option.
return err
}
if resp.StatusCode != http.StatusOK {
// Edit error handling to match application
// requirements. I return an error here. Continuing
// the loop is also an option.
return fmt.Errorf("bad status %d", resp.StatusCode)
}
if bytes.Equal(data, prev) {
continue
}
prev = data
if err := processChangedData(data); err != nil {
// Edit error handling to match application
// requirements. I return an error here. Continuing
// the loop is also an option.
return err
}
}
panic("unexpected break from loop")
}
添加回答
举报