Go并发实战–sync WaitGroup

前言

waitgroup也是一个非常有用的并发工具,有点像是Java中的CyclicBarrier,只不过Go中的WaitGroup等待的是协程而已。 通常来说,WaitGroup是go并发中最常用的工具了,在起协程并发做一些事儿,我们可以通过WaitGroup了表达这一组协程的任务是否完成,已决定是否继续往下走,或者取任务结果,下面来看一下WaitGroup的使用及实现。

语法基础

WaitGroup的核心关注点是:Add()、Done()、Wait()三个函数 Add函数主要为WaitGroup的等待数+1或者+n Done函数调用的也是Add函数,主要用于-1操作 Wait函数是指阻塞当前协程,直到等待数归为0才继续向下执行 下面来看一个demo:

func main() {
    waitGroup := &sync.WaitGroup{}
    DoSomething(waitGroup)
    waitGroup.Wait() // 这里会阻塞main,直到所有的任务都完成
    fmt.Println("end")
}

func DoSomething(waitGroup *sync.WaitGroup) {
    for i:=0;i <10;i++ {
        waitGroup.Add(1)
        go func(waitGroup *sync.WaitGroup) {
            fmt.Print("1-")
            defer waitGroup.Done()
        }(waitGroup)
    }
}

输出:

Go并发实战--sync WaitGroup

实现原理

首先来看WaitGroup的结构体:

type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

noCopy共64位,高32位是一个计数器,低32位代表有多少在等待,64位原子操作需要64位对齐,但32位编译器不能确保对齐。因此,我们分配12个字节,然后使用其中对齐的8个字节作为状态,另外4个字节作为SEMA的存储。 state1 保存的是状态信息 接下来看一下核心的两个函数Add()、Wait() Add(): 1、添加将delta(可能为负)加到waitgroup计数器上。 2、如果计数器变为零,则所有在等待时阻塞的goroutine都被释放。 3、如果计数器为负,则直接panic。

需要注意的是: 1、当计数器为零时发生的具有正增量的调用必须在等待之前发生,但任何时候都可能发生负增量的调用。这意味着要Add的函数调用应该在创建goroutine或等待的其他事件的语句之前执行。 2、如果waitgroup被重用以等待几个独立的事件集,则必须在返回所有以前的wait调用之后进行新的add调用。

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        if delta < 0 {
            // 与wait变为同步操作
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
    if race.Enabled && delta > 0 && v == int32(delta) {
//第一个增量必须与wait同步。
//需要将其变为为一个读取,因为可以有几个从0开始的并发wg.counter转换。
        race.Read(unsafe.Pointer(semap))
    }
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
        return
    }
//当waiters>0时,此goroutine已将counter设置为0。
//现在状态不能同时突变:
//-添加不能与等待同时发生,
//-如果看到counter==0,则wait不会增加waiters。
//仍然执行简单的检查以检测WaitGroup的误用。
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // Reset waiters count to 0.
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false)
    }
}

Wait(): Wait函数主要用于阻塞那些调用Wait的goroutine,直到waitgroup的任务计数器为0,才会放行,下面来看一下源码:

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep
        race.Disable()
    }
    // 循环检查计数器情况
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
        if v == 0 {
            // 如果技术为0的话就不用等待了,完成放行
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // 如果有有新的调用wait函数的,添加新的等待者,等待者计数器+1
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
            // 等待和新添加的是同步的
            // 只能为第一个竞争者进行写操作,否则的话会产生互相竞争
          race.Write(unsafe.Pointer(semap))
            }
            runtime_Semacquire(semap)
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
    }
}

关于WaitGroup暂时介绍这么多。