3 回答
TA贡献1810条经验 获得超4个赞
锁定的全球地图为您提供了良好的开端。您可以为每个“事务”设置一个工作器,处理程序通过通道向他们发送请求,使用锁定的地图来跟踪通道。工作人员可以在收到特殊请求时关闭交易。您不希望悬空事务成为问题,因此您应该安排在超时后发送人为关闭请求。
这不是唯一的方法,尽管它可能很方便。如果您只需要在其他地方处理事务时让某些请求等待,那么可能有一个带有*sync.Mutexes映射的构造,而不是与工作程序 goroutine 通信的通道,这样可以更好地利用资源。(现在在 bgp 的回答中或多或少有这种方法的代码。)
渠道方法的一个例子如下;除了在每个事务中序列化工作之外,它还演示了如何使用close和sync.WaitGroup为这样的设置进行正常关闭和超时。它在操场上。
package main
import (
"fmt"
"log"
"sync"
"time"
)
// Req represents a request. In real use, if there are many kinds of requests, it might be or contain an interface value that can point to one of several different concrete structs.
type Req struct {
id int
payload string // just for demo
// ...
}
// Worker represents worker state.
type Worker struct {
id int
reqs chan *Req
// ...
}
var tasks = map[int]chan *Req{}
var tasksLock sync.Mutex
const TimeoutDuration = 100 * time.Millisecond // to demonstrate; in reality higher
// for graceful shutdown, you probably want to be able to wait on all workers to exit
var tasksWg sync.WaitGroup
func (w *Worker) Work() {
defer func() {
tasksLock.Lock()
delete(tasks, w.id)
if r := recover(); r != nil {
log.Println("worker panic (continuing):", r)
}
tasksLock.Unlock()
tasksWg.Done()
}()
for req := range w.reqs {
// ...do work...
fmt.Println("worker", w.id, "handling request", req)
if req.payload == "close" {
fmt.Println("worker", w.id, "quitting because of a close req")
return
}
}
fmt.Println("worker", w.id, "quitting since its channel was closed")
}
// Handle dispatches the Request to a Worker, creating one if needed.
func (r *Req) Handle() {
tasksLock.Lock()
defer tasksLock.Unlock()
id := r.id
reqs := tasks[id]
if reqs == nil {
// making a buffered channel here would let you queue up
// n tasks for a given ID before the the Handle() call
// blocks
reqs = make(chan *Req)
tasks[id] = reqs
w := &Worker{
id: id,
reqs: reqs,
}
tasksWg.Add(1)
go w.Work()
time.AfterFunc(TimeoutDuration, func() {
tasksLock.Lock()
if reqs := tasks[id]; reqs != nil {
close(reqs)
delete(tasks, id)
}
tasksLock.Unlock()
})
}
// you could close(reqs) if you get a request that means
// 'end the transaction' with no further info. I'm only
// using close for graceful shutdown, though.
reqs <- r
}
// Shutdown asks the workers to shut down and waits.
func Shutdown() {
tasksLock.Lock()
for id, w := range tasks {
close(w)
// delete so timers, etc. won't see a ghost of a task
delete(tasks, id)
}
// must unlock b/c workers can't finish shutdown
// until they can remove themselves from maps
tasksLock.Unlock()
tasksWg.Wait()
}
func main() {
fmt.Println("Hello, playground")
reqs := []*Req{
{id: 1, payload: "foo"},
{id: 2, payload: "bar"},
{id: 1, payload: "baz"},
{id: 1, payload: "close"},
// worker 2 will get closed because of timeout
}
for _, r := range reqs {
r.Handle()
}
time.Sleep(75*time.Millisecond)
r := &Req{id: 3, payload: "quux"}
r.Handle()
fmt.Println("worker 2 should get closed by timeout")
time.Sleep(75*time.Millisecond)
fmt.Println("worker 3 should get closed by shutdown")
Shutdown()
}
TA贡献1839条经验 获得超15个赞
需要保持一个全局互斥锁来锁定对并发请求发生的映射的访问,然后从那里使用互斥锁或计数器,然后确保它没有死锁,然后垃圾收集(或仔细引用计数)旧请求条目
这似乎过于复杂了。这是我将如何做到的:
所有地图内容都应该由一个线程(您的调度程序)处理,因此您不必处理锁定。这假设工作时间远大于调度时间。调度程序跟踪每个 ID 的通道和计数器(显然在地图中)。
唯一的复杂问题是如何处理“goroutine 认为它已经完成了 ID 的工作”与“调度员刚刚发现更多工作”的竞争。答案是工作人员请求清理,但调度员决定清理请求是否可能。
所以这里是代码的工作方式:
1) 调度进程从单个输入通道读取。它获得两种类型的请求:“新工作”(来自外部)和“完成工作”(来自工作人员)。两个请求都包含一个 ID。
2) Dispatcher 收到“New Work”消息:通过 ID 在地图中查找。如果您找到一个频道 + 一个计数,则将作品发送到该频道并增加计数。(*) 如果你什么也没找到,在地图中创建一个新的通道 + 计数,将工作发送到通道(也增加计数),然后在该通道上创建一个工作程序(go-routine)读取。
3)worker goroutine 显然会从通道中拉出“新工作”并完成工作。完成后,它将向 Dispatcher 发送“完成工作”请求。
4) 调度员收到“完成工作”消息。在地图中查找并找到频道 + 计数器。减少计数器。如果为零,则向工作人员发送“退出”消息,并删除地图中的条目。
5) 如果工作 goroutine 收到“退出”消息(而不是工作消息),它就会直接退出。(请注意,当旧工人退出时,可以在该 ID 上创建第二个工人的竞争很小。但旧工人只会处理退出消息,所以没关系。旧工人会清理自己向上,包括旧频道。)
如果您的请求足够慢,则地图中一次将只有一个条目。另一个极端是,如果您对同一 ID 的请求足够快,则该 ID 的通道将保持活动状态(只是计数器会上下波动)。
(*) 注意:如果您将频道设置为 5 深,并且 6 条消息排队,调度程序将停止。我认为在这种情况下您可以扩大频道深度,但我不确定。
- 3 回答
- 0 关注
- 166 浏览
添加回答
举报