为了账号安全,请及时绑定邮箱和手机立即绑定

去 PubSub 没有互斥锁?

去 PubSub 没有互斥锁?

Go
当年话下 2022-10-24 09:35:31
我将在网站后端实现通知系统,每次页面访问都会为用户订阅页面上显示的一些数据,当系统发生变化时,他会收到通知。例如,有人正在查看包含新闻文章的页面,当发布新文章时,我想通知用户,以便他可以通过 js 或重新加载页面来获取这些新文章。手动或自动。为了实现这一点,我将以发布/订阅的方式使用频道。例如,会有一个“新闻”频道。创建新文章时,该频道将收到该文章的 id。当用户打开一个页面并订阅“新闻”频道(可能通过 websocket)时,必须有一个此新闻频道的订阅者列表,他将作为订阅者添加到该列表中以得到通知。就像是:type Channel struct {   ingres <-chan int // news article id   subs [] chan<- int   mx sync.Mutex}这些中的每一个都将有一个 goroutine 将进入的内容分发到 subs 列表中。现在我看到的问题,可能是过早的优化,是会有很多频道和很多来来往往的订户。这意味着会有很多带有互斥锁的世界末日事件。例如,如果有 10 000 个用户在线,订阅了新闻频道,goroutine 将必须发送 10k 个通知,而 subs 切片将被锁定,因此新订阅者将不得不等待互斥锁解锁。现在将其乘以 100 个通道,我认为我们遇到了问题。因此,我正在寻找一种方法来添加和删除订阅者,而不会阻止其他订阅者被添加或删除,或者可能只是在可接受的时间内全面接收通知。“等待所有子程序接收”问题可以通过 goroutine 解决每个子程序的超时问题,因此在收到 id 后,将创建 10k 个 goroutines 并且可以立即解锁互斥锁。但是,它仍然可以添加多个渠道。
查看完整描述

1 回答

?
慕容森

TA贡献1853条经验 获得超18个赞

根据链接的评论,我想出了这段代码:


package notif


import (

    "context"

    "sync"

    "time"

    "unsafe"

)


type Client struct {

    recv   chan interface{}

    ch     *Channel

    o      sync.Once

    ctx    context.Context

    cancel context.CancelFunc

}


// will be nil if this client is write-only

func (c *Client) Listen() <-chan interface{} {

    return c.recv

}


func (c *Client) Close() {

    select {

    case <-c.ctx.Done():

    case c.ch.unsubscribe <- c:

    }

}


func (c *Client) Done() <-chan struct{} {

    return c.ctx.Done()

}


func (c *Client) doClose() {

    c.o.Do(func() {

        c.cancel()

        if c.recv != nil {

            close(c.recv)

        }

    })

}


func (c *Client) send(msg interface{}) {

    // write-only clients will not handle any messages

    if c.recv == nil {

        return

    }

    t := time.NewTimer(c.ch.sc)

    select {

    case <-c.ctx.Done():

    case c.recv <- msg:

    case <-t.C:

        // time out/slow consumer, close the connection

        c.Close()

    }

}


func (c *Client) Broadcast(payload interface{}) bool {

    select {

    case <-c.ctx.Done():

        return false

    default:

        c.ch.Broadcast() <- &envelope{Message: payload, Sender: uintptr(unsafe.Pointer(c))}

        return true

    }

}


type envelope struct {

    Message interface{}

    Sender  uintptr

}


// leech is channel-blocking so goroutine should be called internally to make it non-blocking

// this is to ensure proper order of leeched messages.

func NewChannel(ctx context.Context, name string, slowConsumer time.Duration, emptyCh chan string, leech func(interface{})) *Channel {

    return &Channel{

        name:        name,

        ingres:      make(chan interface{}, 1000),

        subscribe:   make(chan *Client, 1000),

        unsubscribe: make(chan *Client, 1000),

        aud:         make(map[*Client]struct{}, 1000),

        ctx:         ctx,

        sc:          slowConsumer,

        empty:       emptyCh,

        leech:       leech,

    }

}


type Channel struct {

    name        string

    ingres      chan interface{}

    subscribe   chan *Client

    unsubscribe chan *Client

    aud         map[*Client]struct{}

    ctx         context.Context

    sc          time.Duration

    empty       chan string

    leech       func(interface{})

}


func (ch *Channel) Id() string {

    return ch.name

}


// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode

// in which case the client will not be disconnected for being slow reader.

func (ch *Channel) Subscribe(writeOnly ...bool) *Client {

    c := &Client{

        ch: ch,

    }

    if len(writeOnly) == 0 || writeOnly[0] == false {

        c.recv = make(chan interface{})

    }

    c.ctx, c.cancel = context.WithCancel(ch.ctx)

    ch.subscribe <- c

    return c

}


func (ch *Channel) Broadcast() chan<- interface{} {

    return ch.ingres

}


// returns once context is cancelled

func (ch *Channel) Start() {

    for {

        select {

        case <-ch.ctx.Done():

            for cl := range ch.aud {

                delete(ch.aud, cl)

                cl.doClose()

            }

            return

        case cl := <-ch.subscribe:

            ch.aud[cl] = struct{}{}


        case cl := <-ch.unsubscribe:

            delete(ch.aud, cl)

            cl.doClose()

            if len(ch.aud) == 0 {

                ch.signalEmpty()

            }


        case msg := <-ch.ingres:

            e, ok := msg.(*envelope)

            if ok {

                msg = e.Message

            }

            for cl := range ch.aud {

                if ok == false || uintptr(unsafe.Pointer(cl)) != e.Sender {

                    go cl.send(e.Message)

                }

            }

            if ch.leech != nil {

                ch.leech(msg)

            }

        }

    }

}


func (ch *Channel) signalEmpty() {

    if ch.empty == nil {

        return

    }


    select {

    case ch.empty <- ch.name:

    default:

    }

}


type subscribeRequest struct {

    name string

    recv chan *Client

    wo   bool

}


type broadcastRequest struct {

    name string

    recv chan *Channel

}


type brokeredChannel struct {

    ch     *Channel

    cancel context.CancelFunc

}


type brokerLeech interface {

    Match(string) func(interface{})

}


func NewBroker(ctx context.Context, slowConsumer time.Duration, leech brokerLeech) *Broker {

    return &Broker{

        chans:     make(map[string]*brokeredChannel, 100),

        subscribe: make(chan *subscribeRequest, 10),

        broadcast: make(chan *broadcastRequest, 10),

        ctx:       ctx,

        sc:        slowConsumer,

        empty:     make(chan string, 10),

        leech:     leech,

    }

}


type Broker struct {

    chans     map[string]*brokeredChannel

    subscribe chan *subscribeRequest

    broadcast chan *broadcastRequest

    ctx       context.Context

    sc        time.Duration

    empty     chan string

    leech     brokerLeech

}


// returns once context is cancelled

func (b *Broker) Start() {

    for {

        select {

        case <-b.ctx.Done():

            return

        case req := <-b.subscribe:

            ch, ok := b.chans[req.name]

            if ok == false {

                ctx, cancel := context.WithCancel(b.ctx)

                var l func(interface{})

                if b.leech != nil {

                    l = b.leech.Match(req.name)

                }

                ch = &brokeredChannel{

                    ch:     NewChannel(ctx, req.name, b.sc, b.empty, l),

                    cancel: cancel,

                }

                b.chans[req.name] = ch

                go ch.ch.Start()

            }

            req.recv <- ch.ch.Subscribe(req.wo)


        case req := <-b.broadcast:

            ch, ok := b.chans[req.name]

            if ok == false {

                ctx, cancel := context.WithCancel(b.ctx)

                var l func(interface{})

                if b.leech != nil {

                    l = b.leech.Match(req.name)

                }

                ch = &brokeredChannel{

                    ch:     NewChannel(ctx, req.name, b.sc, b.empty, l),

                    cancel: cancel,

                }

                b.chans[req.name] = ch

                go ch.ch.Start()

            }

            req.recv <- ch.ch


        case name := <-b.empty:

            if ch, ok := b.chans[name]; ok {

                ch.cancel()

                delete(b.chans, name)

            }

        }

    }

}


// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode

// in which case the client will not be disconnected for being slow reader.

func (b *Broker) Subscribe(name string, writeOnly ...bool) *Client {

    req := &subscribeRequest{

        name: name,

        recv: make(chan *Client),

        wo:   len(writeOnly) > 0 && writeOnly[0] == true,

    }

    b.subscribe <- req

    c := <-req.recv

    close(req.recv)

    return c

}


func (b *Broker) Broadcast(name string) chan<- interface{} {

    req := &broadcastRequest{name: name, recv: make(chan *Channel)}

    b.broadcast <- req

    ch := <-req.recv

    close(req.recv)

    return ch.ingres

}



查看完整回答
反对 回复 2022-10-24
  • 1 回答
  • 0 关注
  • 71 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信