为了账号安全,请及时绑定邮箱和手机立即绑定

跟踪长时间运行任务的进度 - 正确方法

跟踪长时间运行任务的进度 - 正确方法

Go
天涯尽头无女友 2022-12-26 10:49:01
我想跟踪一些长时间运行的进程的执行并显示用户完成百分比和错误(如果有)。如果这是一个长时间运行的过程,那么很容易——您可以为进度(百分比)和错误创建渠道。当我们有 X 个长时间运行的进程时,实现这种逻辑的正确方法是什么?下面是一段有效的代码,但我不太喜欢它的实现方式。我创建了一个结构ProgressTracker,将Url(作为一个字段)Error,Progress 作为通道。我将其保存ProgressTracker在一个切片中,一旦我提交了所有任务,我就会遍历切片ProgressTracker并收听每个trackerin的频道ProgressTracker。一旦提交的请求数 == 收到的响应数 - 退出循环。它是 Go 惯用的解决方案吗?作为通道传递给函数会更容易ProgressTracker,但我不知道在这种情况下如何正确发送“进度”、“错误”和“完成”事件。代码如下,Go playground 中也有同样的代码:https ://go.dev/play/p/f3hXJsZR9WV
查看完整描述

2 回答

?
Helenr

TA贡献1780条经验 获得超3个赞

你陷入僵局是因为你继续select反对那些没有违约的跟踪器。您的内部for循环每次都会迭代所有跟踪器,其中包括已完成且永远不会发送另一条消息的跟踪器。解决这个问题的最简单方法是 empty default,这也会使它们在现实生活中表现得更好,因为它们并非都以相同的速度运行,但它确实会将其变成一个会消耗更多 CPU 的紧密循环。

WaitGroup什么都不做;你正在调用Wait一个 goroutine,但当它返回时什么也不做,而且你永远不会调用Done它正在跟踪的 goroutine,所以它永远不会返回。相反,您单独跟踪Complete收到的消息数量并使用它而不是 WaitGroup;目前尚不清楚为什么以这种方式实施。

修复两者可解决所述问题:https ://go.dev/play/p/do0g9jrX0mY

但是,这可能不是正确的方法。不可能用一个人为的例子来说明正确的方法是什么;如果这个例子就是它需要做的全部,你不需要任何逻辑,你可以把你的打印语句放在工作人员中,只使用一个等待组而不使用任何通道就可以完成它。假设你实际上正在对结果做某事,你可能想要一个单一的Completed通道和一个Error由所有工作人员共享的单一通道,并且可能需要一个完全不同的机制来跟踪进度,比如你可以从中读取的原子 int/float想知道目前的进展。那么你不需要嵌套循环的东西,你只需要一个循环select从共享频道读取消息。这完全取决于打算使用此代码的上下文。


查看完整回答
反对 回复 2022-12-26
?
RISEBY

TA贡献1856条经验 获得超5个赞

我想出了这种方法,它适用于我的需要:


package main


import (

    "errors"

    "fmt"

    "strings"

    "sync"

    "time"

)


type ProgressTracker struct {

    Progress  int

    Error     error

    Completed bool

    Url       string

}


/**

This method sleeps for 1 second and sends progress (in %) in each iteration to Progress channel

For .net sites on 3rd iteration fail with error

When everything is completed, send a message to Complete channel

*/

func work(url string, tracker chan ProgressTracker) {

    var internalTracker = ProgressTracker{

        Url: url,

    }

    tracker <- internalTracker

    fmt.Printf("processing url %s\n", url)

    for i := 1; i <= 5; i++ {

        if url == "google.com" {

            time.Sleep(time.Second * 3)

        }

        time.Sleep(time.Second)

        if i == 3 && strings.HasSuffix(url, ".net") {

            internalTracker.Error = errors.New("error for .net sites")

            internalTracker.Completed = true

            tracker <- internalTracker

            return

        }

        progress := 20 * i

        internalTracker.Progress = progress

        internalTracker.Completed = false

        tracker <- internalTracker

    }

    internalTracker.Completed = true

    tracker <- internalTracker

}


func main() {

    var urls = []string{"google.com", "youtube.com", "someurl.net"}

    var tracker = make(chan ProgressTracker, len(urls))

    var wg sync.WaitGroup

    wg.Add(len(urls))


    for _, url := range urls {

        go func(workUrl string) {

            defer wg.Done()

            work(workUrl, tracker)

        }(url)

    }


    go func() {

        wg.Wait()

        close(tracker)

        fmt.Printf("After wg wait")

    }()


    var completed = 0


    for completed < len(urls) {

        select {

        case t := <-tracker:

            if t.Completed {

                fmt.Printf("Processing for %s is completed!\n", t.Url)

                completed = completed + 1

            } else {

                fmt.Printf("Processing for %s is in progress: %d\n", t.Url, t.Progress)

            }

            if t.Error != nil {

                fmt.Printf("Url %s has errors %s\n", t.Url, t.Error)

            }

        }


    }

}

在这里,我ProgressTracker作为通道传递(中的字段ProgressTracker被声明为简单字段,而不是通道)并且在来自工作函数的每个事件上返回正在发生的事情的完整状态(如果进度增加 - 设置新值并将结构返回通道,如果发生错误 - 设置错误并返回结构等)。


查看完整回答
反对 回复 2022-12-26
  • 2 回答
  • 0 关注
  • 87 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信