为了账号安全,请及时绑定邮箱和手机立即绑定

使用高郎持续检查 API 中是否存在数据变化

使用高郎持续检查 API 中是否存在数据变化

函数式编程 2022-09-12 16:20:21
我正在尝试轮询一个API以保留流量数据的时间序列,并在发生更改时将该数据保存到postgres。目前,我有一个这样的实现//this needs to check the api for new information every X secondsfunc Poll(req *http.Request, client *http.Client) ([]byte, error) {    r := rand.New(rand.NewSource(99))    c := time.Tick(10 * time.Second)    for _ = range c {        //Download the current contents of the URL and do something with it        response, err := client.Do(req)        data, _ := io.ReadAll(response.Body)        if err != nil {            return nil, err        }        return data, nil        // add a bit of jitter        jitter := time.Duration(r.Int31n(5000)) * time.Millisecond        time.Sleep(jitter)    }}func main() {    client := &http.Client{        Timeout: time.Second * 60 * 60 * 600,    }    url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"    req, err := http.NewRequest("GET", url, nil)    if err != nil {        return err    }    req.Header.Set("Ocp-Apim-Subscription-Key", "xx")    // response, err := client.Do(req)    data, err := Poll(req, client)    fmt.Println(string(data))}接下来,我将执行比较功能。基本上,我正在尝试找出如何确保循环首先调用查询并返回适当的值。我认为这个实现可能不是很好,我只是不确定如何真正正确地实现它。我能得到一些指点吗?
查看完整描述

2 回答

?
繁星淼淼

TA贡献1775条经验 获得超11个赞

您的问题呈现了一个典型的生产者/消费者场景,因为您的 Poll() 函数正在生成由 main() 函数使用的响应数据(可能是在 postgres 中保存数据)。通过使用 go 例程和通道可以很好地解决此问题。


轮询工作可以在一个 goroutine 中完成,该 goroutine 通过通道将响应数据传达给主函数。轮询工作时也可能出现错误(响应错误或 io 错误),因此也应将其传达给 main() 函数。


首先定义一个新类型来保存轮询的数据和一个错误:


type PollResponse struct {

    Data []byte

    Err error

}

在 Poll() 函数中,启动一个 go 例程来执行轮询工作,并返回一个通道来共享 go 例程之外的数据:


func Poll(req *http.Request, client *http.Client) (ch chan PollResponse){

    ch = make(chan PollResponse) // Buffered channel is also good

    go func() {

        defer func() {

            close(ch)

        }()

        r := rand.New(rand.NewSource(99))

        c := time.Tick(10 * time.Second)


        for range c {

            res, err := client.Do(req);

            pollRes := PollResponse {}

            if err != nil {

                pollRes.Data, pollRes.Err = nil, err

                ch <- pollRes

                break

            }

            pollRes.Data, pollRes.Err = io.ReadAll(res.Body)

            ch <- pollRes

            if pollRes.Err != nil {

                break

            }

            jitter := time.Duration(r.Int31n(5000)) * time.Millisecond

            time.Sleep(jitter)

        }

    }()

    return

}

最后在 main() 函数中,调用 Poll() 并读取通道以获得轮询响应:


func main() {

    client := &http.Client{

        Timeout: time.Second * 60 * 60 * 600,

    }

    url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"


    req, err := http.NewRequest("GET", url, nil)

    if err != nil {

        return

    }

    req.Header.Set("Ocp-Apim-Subscription-Key", "xx")


    pollCh := Poll(req, client)

    

    for item := range pollCh {

        if item.Err == nil {

            fmt.Println(string(item.Data)) // or save it to postgres database

        }       

    }

}


查看完整回答
反对 回复 2022-09-12
?
长风秋雁

TA贡献1757条经验 获得超7个赞

在股票代码通道上的范围。在每次迭代中,获取数据,检查数据是否已更改并处理数据。关键点是从循环内部处理数据,而不是从函数返回数据。


假设您具有以下函数:


// procesChangedData updates the database with new

// data from the API endpoint.

func processChangedData(data []byte) error {

    // implement save to postgress

}

使用以下函数进行轮询:


func Poll(client *http.Client) error {


    url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"


    // Use NewTicker instead of Tick so we can cleanup

    // ticker on return from the function.

    t := time.NewTicker(10 * time.Second)

    defer t.Stop()


    var prev []byte


    for _ = range t.C {


        // Create a new request objet for each request.

        req, err := http.NewRequest("GET", url, nil)

        if err != nil {

            return err

        }

        req.Header.Set("Ocp-Apim-Subscription-Key", "xx")


        resp, err := client.Do(req)

        if err != nil {

            // Edit error handling to match application 

            // requirements. I return an error here. Continuing

            // the loop is also an option.

            return err

        }


        data, err := io.ReadAll(resp.Body)


        // Ensure that body is closed before handling errors

        // below.

        resp.Body.Close()


        if err != nil {

            // Edit error handling to match application 

            // requirements. I return an error here. Continuing

            // the loop is also an option.

            return err

        }


        if resp.StatusCode != http.StatusOK {

            // Edit error handling to match application 

            // requirements. I return an error here. Continuing

            // the loop is also an option.

            return fmt.Errorf("bad status %d", resp.StatusCode)

        }


        if bytes.Equal(data, prev) {

            continue

        }

        prev = data


        if err := processChangedData(data); err != nil {

            // Edit error handling to match application 

            // requirements. I return an error here. Continuing

            // the loop is also an option.

            return err

        }

    }

    panic("unexpected break from loop")

}


查看完整回答
反对 回复 2022-09-12
  • 2 回答
  • 0 关注
  • 61 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信