为了账号安全,请及时绑定邮箱和手机立即绑定

golang生产者消费者接收到的消息数

golang生产者消费者接收到的消息数

Go
开满天机 2022-10-24 16:06:26
我在 golang 中编写了生产者-消费者模式。读取多个 csv 文件并处理记录。我正在一口气读取 csv 文件的所有记录。我想以包括所有 csv 文件在内的总记录的 5% 的间隔记录处理完成的百分比。例如,我有 3 个 csv 要处理,每个有 20、30、50 行/记录(因此总共要处理 100 条记录)想要在处理 5 条记录时记录进度。func processData(inputCSVFiles []string) {    producerCount := len(inputCSVFiles)    consumerCount := producerCount    link := make(chan []string, 100)    wp := &sync.WaitGroup{}    wc := &sync.WaitGroup{}    wp.Add(producerCount)    wc.Add(consumerCount)    for i := 0; i < producerCount; i++ {        go produce(link, inputCSVFiles[i], wp)    }    for i := 0; i < consumerCount; i++ {        go consume(link, wc)    }    wp.Wait()    close(link)    wc.Wait()    fmt.Println("Completed data migration process for all CSV data files.")}func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {    defer wg.Done()    records := readCsvFile(filePath)    totalNumberOfRecords := len(records)    for _, record := range records {        link <- record    }}func consume(link <-chan []string, wg *sync.WaitGroup) {    defer wg.Done()    for record := range link {        // process csv record    }}
查看完整描述

1 回答

?
浮云间

TA贡献1829条经验 获得超4个赞

我使用了原子变量和计数器通道,其中消费者将在处理记录时推送计数,其他 goroutine 将从通道中读取并计算总处理记录百分比。


var progressPercentageStep float64 = 5.0

var totalRecordsToProcess int32


func processData(inputCSVFiles []string) {

        producerCount := len(inputCSVFiles)

        consumerCount := producerCount

        link := make(chan []string, 100)

        counter := make(chan int, 100)

        defer close(counter)

        wp := &sync.WaitGroup{}

        wc := &sync.WaitGroup{}

    

        wp.Add(producerCount)

        wc.Add(consumerCount)

    

        for i := 0; i < producerCount; i++ {

            go produce(link, inputCSVFiles[i], wp)

        }


        go progressStats(counter)


        for i := 0; i < consumerCount; i++ {

            go consume(link, wc)

        }

        wp.Wait()

        close(link)

        wc.Wait()

        

    }

    

    func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {

        defer wg.Done()

        records := readCsvFile(filePath)

        atomic.AddInt32(&totalRecordsToProcess, int32(len(records)))

        for _, record := range records {

            link <- record

        }

    }

    

    func consume(link <-chan []string,counter chan<- int, wg *sync.WaitGroup) {

        defer wg.Done()

        for record := range link {

            // process csv record

            counter <- 1

        }

    }

    

func progressStats(counter <-chan int) {

    var feedbackThreshold = progressPercentageStep

    for count := range counter {

        totalRemaining := atomic.AddInt32(&totalRecordsToProcess, -count)

        donePercent := 100.0 * processed / totalRemaining

        // log progress

        if donePercent >= feedbackThreshold {

            log.Printf("Progress ************** Total Records: %d, Processed Records : %d, Processed Percentage: %.2f **************\n", totalRecordsToProcess, processed, donePercent)

            feedbackThreshold += progressPercentageStep

        }

    }

}


查看完整回答
反对 回复 2022-10-24
  • 1 回答
  • 0 关注
  • 102 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信