为了账号安全,请及时绑定邮箱和手机立即绑定

从输入通道正确批处理项目

从输入通道正确批处理项目

Go
繁星coding 2023-05-22 15:38:25
用例我想在通过通道接收的 MySQL 数据库中保存大量数据。出于性能原因,我以 10 件为一组处理它们。我每 3 小时才收到一次输入项目。问题假设我得到 10004 个项目,将剩下 4 个项目,因为我的 go 例程在批量“冲走”之前等待 10 个项目。我想确保它创建一个少于 10 个项目的批次,以防该通道中没有更多项目(然后生产者也关闭了通道)。代码:// ProcessAudits sends the given audits in batches to SQLfunc ProcessAudits(done <-chan bq.Audit) {    var audits []bq.Audit    for auditRow := range done {        user := auditRow.UserID.StringVal        log.Infof("Received audit %s", user)        audits = append(audits, auditRow)        if len(audits) == 10 {            upsertBigQueryAudits(audits)            audits = []bigquery.Audit{}        }    }}我是 Go 的新手,我不确定如何正确实施它?
查看完整描述

2 回答

?
红颜莎娜

TA贡献1842条经验 获得超12个赞

这是一个工作示例。当通道关闭时,范围退出,因此您可以在循环后处理任何剩余的项目。


package main


import (

    "fmt"

    "sync"

)


type Audit struct {

    ID int

}


func upsertBigQueryAudits(audits []Audit) {

    fmt.Printf("Processing batch of %d\n", len(audits))

    for _, a := range audits {

        fmt.Printf("%d ", a.ID)

    }

    fmt.Println()

}


func processAudits(audits <-chan Audit, batchSize int) {

    var batch []Audit

    for audit := range audits {

        batch = append(batch, audit)

        if len(batch) == batchSize {

            upsertBigQueryAudits(batch)

            batch = []Audit{}

        }

    }

    if len(batch) > 0 {

        upsertBigQueryAudits(batch)

    }

}


func produceAudits(x int, to chan Audit) {

    for i := 0; i < x; i++ {

        to <- Audit{

            ID: i,

        }

    }

}


const batchSize = 10


func main() {

    var wg sync.WaitGroup

    audits := make(chan Audit)

    wg.Add(1)

    go func() {

        defer wg.Done()

        processAudits(audits, batchSize)

    }()

    wg.Add(1)

    go func() {

        defer wg.Done()

        produceAudits(25, audits)

        close(audits)

    }()

    wg.Wait()

    fmt.Println("Complete")

}

输出:


Processing batch of 10

0 1 2 3 4 5 6 7 8 9

Processing batch of 10

10 11 12 13 14 15 16 17 18 19

Processing batch of 5

20 21 22 23 24

Complete


查看完整回答
反对 回复 2023-05-22
?
拉莫斯之舞

TA贡献1820条经验 获得超10个赞

您也可以使用定时器。在这里玩例子https://play.golang.org/p/0atlGVCL-px

func printItems(items []int) {

    fmt.Println(items)

    return

}


func main() {

    

    items := []int {1,2,3,4,5,6,7,8}

    ch := make(chan int, 5)


    go func(ch <-chan int) {


        timer := time.NewTimer(1 * time.Second)

        temp := make([]int, 0, 5)


        for {


            select {

            case i := <- ch:

                timer.Reset(1 * time.Second)

                temp = append(temp, i)

                if len(temp) == 5 {

                    printItems(temp)

                    temp = []int {}

                }

            case <- timer.C:

                printItems(temp)

                temp = []int {}

            }


        }


    }(ch)

    

    for k, i := range items {

        fmt.Println("Send ", i)

        ch <- i

        if k == 7 {

            time.Sleep(5 * time.Second)

        }

    }


}


查看完整回答
反对 回复 2023-05-22
  • 2 回答
  • 0 关注
  • 128 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信