为了账号安全,请及时绑定邮箱和手机立即绑定

为什么我的golang无锁队列总是停在那里?

为什么我的golang无锁队列总是停在那里?

Go
慕侠2389804 2021-05-12 10:14:47
这是我的代码:package mainimport (    "sync/atomic"    "unsafe"    "sync"    "fmt"    "time")const (    MAX_DATA_SIZE = 100)// lock free queuetype Queue struct {    head unsafe.Pointer    tail unsafe.Pointer}// one node in queuetype Node struct {    val interface{}    next unsafe.Pointer}// queue functionsfunc (self *Queue) enQueue(val interface{}) {    newValue := unsafe.Pointer(&Node{val: val, next: nil})    var tail,next unsafe.Pointer    for {        tail = self.tail        next = ((*Node)(tail)).next        if next != nil {            atomic.CompareAndSwapPointer(&(self.tail), tail, next)        }else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue){            break        }    }}func (self *Queue) deQueue() (val interface{}, success bool){    var head,tail,next unsafe.Pointer    for {        head = self.head        tail = self.tail        next = ((*Node)(head)).next        if head == tail {            if next == nil {                return nil, false            }else {                atomic.CompareAndSwapPointer(&(self.tail), tail, next)            }        }else {            val = ((*Node)(next)).val            if atomic.CompareAndSwapPointer(&(self.head), head, next) {                return val, true            }        }    }    return}func main() {    var wg sync.WaitGroup    wg.Add(20)    queue := new(Queue)    queue.head = unsafe.Pointer(new(Node))    queue.tail = queue.head    for i := 0; i < 10; i++ {        go func() {            defer wg.Done()            for j := 0; j < MAX_DATA_SIZE; j++ {                t := time.Now()                queue.enQueue(t)                fmt.Println("enq = ", t)            }        }()    }问题是,有时代码可以正常运行,但是有时它会失败并且只会卡住而没有任何响应。我的代码有什么问题吗?
查看完整描述

2 回答

?
陪伴而非守候

TA贡献1757条经验 获得超8个赞

这是上面改写的通道,建议使用@mkb(排除无限队列大小)。


它不会锁定。


我建议您使用渠道,除非您有充分的理由不这样做,因为Go团队已花费大量精力使其变得可靠,高性能且易于使用。


package main


import (

    "fmt"

    "runtime"

    "sync"

    "time"

)


const (

    MAX_DATA_SIZE = 100

)


func main() {

    runtime.GOMAXPROCS(4)

    var wg sync.WaitGroup

    wg.Add(20)

    queue := make(chan time.Time, 10)


    for i := 0; i < 10; i++ {

        go func() {

            defer wg.Done()

            for j := 0; j < MAX_DATA_SIZE; j++ {

                t := time.Now()

                queue <- t

                fmt.Println("enq = ", t)

            }

        }()

    }


    for i := 0; i < 10; i++ {

        go func() {

            defer wg.Done()

            for j := 0; j < MAX_DATA_SIZE; j++ {

                val := <-queue

                fmt.Println("deq = ", val)

            }

        }()

    }


    wg.Wait()

}


查看完整回答
反对 回复 2021-05-17
?
MM们

TA贡献1886条经验 获得超2个赞

这段代码中有很多活跃的等待者,我强烈建议像Nick的漂亮代码一样,干净地使用频道。


但是,这是我对确切的原始问题“为什么会卡住?”的回答。:没有保证每个goroutine何时屈服以让其他goroutine执行,并且很可能在无限循环内永远不会屈服。


您可以通过在每个可能无限的for循环中使用runtime.Gosched()来解决此问题:

Gosched产生处理器,从而允许其他goroutine运行。它不会挂起当前的goroutine,因此执行会自动恢复。


此增强的代码几乎可以与原始代码一样快地运行,但是永远不会挂起:


package main


import (

    "fmt"

    "runtime"

    "sync"

    "sync/atomic"

    "time"

    "unsafe"

)


const (

    MAX_DATA_SIZE = 100

)


// lock free queue

type Queue struct {

    head unsafe.Pointer

    tail unsafe.Pointer

}


// one node in queue

type Node struct {

    val  interface{}

    next unsafe.Pointer

}


// queue functions

func (self *Queue) enQueue(val interface{}) {

    newValue := unsafe.Pointer(&Node{val: val, next: nil})

    var tail, next unsafe.Pointer

    for {

        tail = self.tail

        next = ((*Node)(tail)).next

        if next != nil {

            atomic.CompareAndSwapPointer(&(self.tail), tail, next)

        } else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue) {

            break

        }

        runtime.Gosched()

    }

}


func (self *Queue) deQueue() (val interface{}, success bool) {

    var head, tail, next unsafe.Pointer

    for {

        head = self.head

        tail = self.tail

        next = ((*Node)(head)).next

        if head == tail {

            if next == nil {

                return nil, false

            } else {

                atomic.CompareAndSwapPointer(&(self.tail), tail, next)

            }

        } else {

            val = ((*Node)(next)).val

            if atomic.CompareAndSwapPointer(&(self.head), head, next) {

                return val, true

            }

        }

        runtime.Gosched()

    }

    return

}


func main() {

    var wg sync.WaitGroup

    wg.Add(20)

    queue := new(Queue)

    queue.head = unsafe.Pointer(new(Node))

    queue.tail = queue.head


    for i := 0; i < 10; i++ {

        go func() {

            defer wg.Done()

            for j := 0; j < MAX_DATA_SIZE; j++ {

                t := time.Now()

                queue.enQueue(t)

                fmt.Println("enq = ", t)

            }

        }()

    }


    for i := 0; i < 10; i++ {

        go func() {

            ok := false

            var val interface{}

            defer wg.Done()

            for j := 0; j < MAX_DATA_SIZE; j++ {

                val, ok = queue.deQueue()

                for !ok {

                    val, ok = queue.deQueue()

                    runtime.Gosched()

                }

                fmt.Println("deq = ", val)

            }

        }()

    }


    wg.Wait()

}


查看完整回答
反对 回复 2021-05-17
  • 2 回答
  • 0 关注
  • 454 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信