乘风原创程序

  • golang WaitGroup源码解析
  • 2020/11/26 10:01:29
  • ? 目录

    WaitGroup的结构体

    WaitGroup的方法

    state()

    Add()

    Done()

    Wait()


    ?

    ???? 上一篇学习了WaitGroup的用法,这篇学习下WaitGroup的源码。源码路径:$GOROOT/src/sync/waitgroup.go


    ???? WaitGroup的基本机制是通过计数器记录被等待goroutine的数目,当goroutine退出后计数器会清零,同时通过信号量机制通知Wait函数解除阻塞。WaitGroup内部有两个计数器,可以称为v和w(借用源码里的计数器变量名),v记录了被等待的goroutine的数目,WaitGroup的Add()和Done()方法负责更新v的值;w记录了调用Wait()的goroutine的数目,Wait()方法负责更新w的值。

    WaitGroup的结构体

    type WaitGroup struct {
            //表示禁止复制,可以忽略
    	noCopy noCopy
    
            //计数器,虽然有12个字节,但实际只使用了8个字节。定义12个字节跟64位地址对齐有关,目的是为了兼容32位编译器
    	state1 [12]byte
    
            //信号量
    	sema   uint32
    }

    核心成员就是state1和sema。state1用作计数器,实际只用了8个字节,高4个字节记录被等待goroutine的数目,低4个字节记录调用Wait()的goroutine的数目:

    |________|________|

    ????? v??????????????? w

    WaitGroup的方法

    state()

    获取实际的8个字节作为state的值

    func (wg *WaitGroup) state() *uint64 {
    	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
    		return (*uint64)(unsafe.Pointer(&wg.state1))
    	} else {
    		return (*uint64)(unsafe.Pointer(&wg.state1[4]))
    	}
    }

    如果state1的地址是8字节对齐的,直接取state1的低8字节;如果不是8字节对齐的,取state1的高8个字节。

    Add()

    非常重要,有两个作用,一个更新v的值,另一个当v变成0时发送信号量(下面的代码去掉了源码里的race判断,race判断是竞争检测相关的,只有当运行程序时带-race参数才起作用,不是WaitGroup的核心功能,去掉后方便看清程序主体)

    func (wg *WaitGroup) Add(delta int) {
            //取出state的值
    	statep := wg.state()
    
            //原子操作的方式把入参delta的值加到了state的高4字节,也就是v = v + delta
    	state := atomic.AddUint64(statep, uint64(delta)<<32)
    
            //state右移32位得到v的值
    	v := int32(state >> 32)
    
            //state的低32位是w的值
    	w := uint32(state)
    
            //v变成负数时就panic。比如第一次调用Add就传负数,就会出现这种panic
    	if v < 0 {
    		panic("sync: negative WaitGroup counter")
    	}
    
            //正常Add在Wait之前调用,w是0才对
    	if w != 0 && delta > 0 && v == int32(delta) {
    		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    	}
            
            //正常Add之后程序返回
    	if v > 0 || w == 0 {
    		return
    	}
    	
            //又一个误用检查,没看懂
    	if *statep != state {
    		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    	}
    
    	//把计数器v和w都置0
    	*statep = 0
    
            //发送w个信号量,也就是有多少goroutine调用了Wait()就发送多少信号量,让他们都解除阻塞
    	for ; w != 0; w-- {
    		runtime_Semrelease(&wg.sema, false)
    	}
    }

    Done()

    实际上封装的Add()

    func (wg *WaitGroup) Done() {
    	wg.Add(-1)
    }

    看到有人讨论Add()能不能传负数的问题,肯定是可以传负数的,只是得保证计数器v的值不能是负数。比如Add(2)后再Add(-1)是没问题的如果第一次调用Add()就传负数,会出现panic。

    Wait()

    两个作用,更新w的值和阻塞调用方

    func (wg *WaitGroup) Wait() {
            //获取state的值
    	statep := wg.state()
    
    	for {
                    //原子操作的方式获取state的值
    		state := atomic.LoadUint64(statep)
    		v := int32(state >> 32)
    		w := uint32(state)
                    
                    //如果等待的goroutine的数量为0就不用等待直接返回
    		if v == 0 {
    			return
    		}
    
    		// 调用Wait()的goroutine的数目加1,Add()发送信号量的时候知道多少goroutine在等待
    		if atomic.CompareAndSwapUint64(statep, state, state+1) {
                            //获取信号量,如果拿不到就阻塞,如果拿到了就继续执行
    			runtime_Semacquire(&wg.sema)
                            
                            //程序运行到这里说明Add()已经发送了信号量,Wait()解除阻塞了,这个时间state的值是0才正常。非0是个异常情况,这里panic
    			if *statep != 0 {
    				panic("sync: WaitGroup is reused before previous Wait has returned")
    			}
    			return
    		}
    	}
    }
    

    总结

    通过源码可以看出来使用WaitGroup时得注意各个方法的调用顺序,顺序用错会导致panic。另外如果Add()加的数量大于Done()减的数量的话,会导致程序出现deadlock。

    ?

    ?

    本文地址:https://blog.csdn.net/haowunanhai/article/details/110121784