为了账号安全,请及时绑定邮箱和手机立即绑定

Go:学习通道和排队,致命错误

Go:学习通道和排队,致命错误

Go
aluckdog 2021-12-13 10:45:37
我正在尝试学习如何使用通道在 Go 中为我的其他项目之一创建队列。我的另一个项目基本上是对数据库行进行排队,然后使用行中的详细信息对数据库进行数字运算。我不希望工作人员同时处理同一行,因此它需要检查工作人员当前是否正在处理该特定行 ID,如果是,则等待它完成。如果不是同一个row ID,可以异步运行,但是我也想限制可以同时运行的异步worker的数量。在我下面的代码中,我目前试图将其限制为三名工人。这是我所拥有的:package mainimport (    "log"    "strconv"    "time")// RowInfo holds the job infotype RowInfo struct {    id int}// WorkerCount holds how many workers are currently runningvar WorkerCount int// WorkerLocked specifies whether a row ID is currently processing by a workervar WorkerLocked map[string]bool// Process the RowInfofunc worker(row RowInfo) {    rowID := strconv.Itoa(row.id)    WorkerCount++    WorkerLocked[rowID] = true    time.Sleep(1 * time.Second)    log.Printf("ID rcvd: %d", row.id)    WorkerLocked[rowID] = false    WorkerCount--}// waiter will check if the row is already processing in a worker// Block until it finishes completion, then dispatchfunc waiter(row RowInfo) {    rowID := strconv.Itoa(row.id)    for WorkerLocked[rowID] == true {        time.Sleep(1 * time.Second)    }    go worker(row)}func main() {    jobsQueue := make(chan RowInfo, 10)    WorkerLocked = make(map[string]bool)    // Dispatcher waits for jobs on the channel and dispatches to waiter    go func() {        // Wait for a job        for {            // Only have a max of 3 workers running asynch at a time            for WorkerCount > 3 {                time.Sleep(1 * time.Second)            }            job := <-jobsQueue            go waiter(job)        }    }()    // Test the queue, send some data    for i := 0; i < 12; i++ {        r := RowInfo{            id: i,        }        jobsQueue <- r    }    // Prevent exit!    for {        time.Sleep(1 * time.Second)    }}
查看完整描述

2 回答

?
ABOUTYOU

TA贡献1812条经验 获得超5个赞

如果您要使用WorkerLocked地图,则需要使用sync包保护对其的访问。您还需要以WorkerCount相同的方式(或使用原子操作)进行保护。这样做也会使睡眠变得不必要(使用条件变量)。

更好的是,让 3 个(或多个)工作人员等待行使用通道处理。然后,您会将行分配给各个工作人员,以便特定的工作人员始终处理特定的行(例如,使用 row.id % 3 来确定将行发送到哪个工作人员/通道)。


查看完整回答
反对 回复 2021-12-13
?
幕布斯6054654

TA贡献1876条经验 获得超7个赞

我强烈建议不要在这种情况下使用任何锁定,其中您有处理从数据库读取的工作人员。锁和信号量一般会导致很多问题,最终会给你留下一堆损坏的数据。相信我。去过也做过。在这种情况下,您需要小心并避免使用它们。例如,如果您希望保留数据和维护地图但不用于实际处理,则锁定是很好的。

通过锁定 go 例程,你会不必要地减慢你的 go 程序。Go 旨在尽可能快地处理事情。不要压着他。

这是我自己的一些理论,可以帮助您更好地理解我想说的内容:

  • 为了处理工人限制为 3。只需生成 3 个不同的从队列中选择的 goroutine。Worker 永远不会从频道接受相同的工作,所以你在这里很安全。

  • make() 已经完成了内部通道限制,可以很好地在这种情况下使用。该通道限制是实际的第二个参数。所以如果你写

    队列 := make(chan RowInfo, 10)

    这意味着这个队列最多可以占用10个 RowInfo。如果聚合到此队列中的循环达到 10 个,它将锁定并等待工作人员从通道中释放一项。因此,一旦队列达到 9,数据库聚合器将写入第 10 个,而 worker 将取出第 10 个。


通过这种方式,您可以拥有 golang 的自然工作流程:) 这也称为生成pre-workers

package main


import (

    "fmt"

    "os"

    "os/signal"

    "syscall"

    "time"

)


// RowInfo holds the job info

type RowInfo struct {

    ID int

}


func worker(queue chan RowInfo, done chan bool) {

    fmt.Println("Starting worker...")


    for {

        select {

        case row := <-queue:

            fmt.Printf("Got row info: %v \n", row)

            // Keep it for second so we can see actual queue lock working

            time.Sleep(1 * time.Second)


        case <-time.After(10 * time.Second):

            fmt.Printf("This job is taking way too long. Let's clean it up now by lets say write write in database that job has failed so it can be restarted again when time is right.")

        case <-done:

            fmt.Printf("Got quit signal... Killing'em all")

            break

        }

    }

}


func handleSigterm(kill chan os.Signal, done chan bool) {

    select {

    case _ = <-kill:

        close(done)

    }

}


func main() {


    // Do not allow more than 10 records to be in the channel.

    queue := make(chan RowInfo, 10)

    done := make(chan bool)


    kill := make(chan os.Signal, 1)


    signal.Notify(kill, os.Interrupt)

    signal.Notify(kill, syscall.SIGTERM)


    go handleSigterm(kill, done)


    for i := 0; i < 3; i++ {

        go worker(queue, done)

    }


    // Should be infinite loop in the end...

    go func() {

        for i := 0; i < 100; i++ {

            fmt.Printf("Queueing: %v \n", i)

            queue <- RowInfo{ID: i}

        }

    }()


    <-done

    // Give it some time to process things before shutting down. This is bad way of doing things

    // but is efficient for this example

    time.Sleep(5 * time.Second)

}

至于管理数据库状态,您可以在数据库中显示“进行中”的状态,因此每次选择您时,也要对该行进行更新,以表明正在进行中。这当然是一种方法。通过在 golang 中保留某种映射,我会说你会比需要的更多地折磨你的服务。


查看完整回答
反对 回复 2021-12-13
  • 2 回答
  • 0 关注
  • 145 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信