2 回答
TA贡献1812条经验 获得超5个赞
如果您要使用WorkerLocked
地图,则需要使用sync
包保护对其的访问。您还需要以WorkerCount
相同的方式(或使用原子操作)进行保护。这样做也会使睡眠变得不必要(使用条件变量)。
更好的是,让 3 个(或多个)工作人员等待行使用通道处理。然后,您会将行分配给各个工作人员,以便特定的工作人员始终处理特定的行(例如,使用 row.id % 3 来确定将行发送到哪个工作人员/通道)。
TA贡献1876条经验 获得超7个赞
我强烈建议不要在这种情况下使用任何锁定,其中您有处理从数据库读取的工作人员。锁和信号量一般会导致很多问题,最终会给你留下一堆损坏的数据。相信我。去过也做过。在这种情况下,您需要小心并避免使用它们。例如,如果您希望保留数据和维护地图但不用于实际处理,则锁定是很好的。
通过锁定 go 例程,你会不必要地减慢你的 go 程序。Go 旨在尽可能快地处理事情。不要压着他。
这是我自己的一些理论,可以帮助您更好地理解我想说的内容:
为了处理工人限制为 3。只需生成 3 个不同的从队列中选择的 goroutine。Worker 永远不会从频道接受相同的工作,所以你在这里很安全。
make() 已经完成了内部通道限制,可以很好地在这种情况下使用。该通道限制是实际的第二个参数。所以如果你写
队列 := make(chan RowInfo, 10)
这意味着这个队列最多可以占用10个 RowInfo。如果聚合到此队列中的循环达到 10 个,它将锁定并等待工作人员从通道中释放一项。因此,一旦队列达到 9,数据库聚合器将写入第 10 个,而 worker 将取出第 10 个。
通过这种方式,您可以拥有 golang 的自然工作流程:) 这也称为生成pre-workers
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
// RowInfo holds the job info
type RowInfo struct {
ID int
}
func worker(queue chan RowInfo, done chan bool) {
fmt.Println("Starting worker...")
for {
select {
case row := <-queue:
fmt.Printf("Got row info: %v \n", row)
// Keep it for second so we can see actual queue lock working
time.Sleep(1 * time.Second)
case <-time.After(10 * time.Second):
fmt.Printf("This job is taking way too long. Let's clean it up now by lets say write write in database that job has failed so it can be restarted again when time is right.")
case <-done:
fmt.Printf("Got quit signal... Killing'em all")
break
}
}
}
func handleSigterm(kill chan os.Signal, done chan bool) {
select {
case _ = <-kill:
close(done)
}
}
func main() {
// Do not allow more than 10 records to be in the channel.
queue := make(chan RowInfo, 10)
done := make(chan bool)
kill := make(chan os.Signal, 1)
signal.Notify(kill, os.Interrupt)
signal.Notify(kill, syscall.SIGTERM)
go handleSigterm(kill, done)
for i := 0; i < 3; i++ {
go worker(queue, done)
}
// Should be infinite loop in the end...
go func() {
for i := 0; i < 100; i++ {
fmt.Printf("Queueing: %v \n", i)
queue <- RowInfo{ID: i}
}
}()
<-done
// Give it some time to process things before shutting down. This is bad way of doing things
// but is efficient for this example
time.Sleep(5 * time.Second)
}
至于管理数据库状态,您可以在数据库中显示“进行中”的状态,因此每次选择您时,也要对该行进行更新,以表明正在进行中。这当然是一种方法。通过在 golang 中保留某种映射,我会说你会比需要的更多地折磨你的服务。
- 2 回答
- 0 关注
- 145 浏览
添加回答
举报