1. 前言 之前曾经阅读过 x/sync 包的代码,这些工具在 sync 标准库的基础上封装出更贴合业务的抽象。相对比之下,sync 中提供的内容就更底层一些。最近心血来潮想继续研究下 sync 本身的一些内容,于是就有了这篇博客。本篇博客尝试从源码的角度来讨论其中的三个常用工具,并假设读者有过它们的使用经验,所以不会在使用方式上做过多说明。此外为了更加关注核心逻辑,我们下文贴出的代码会删减掉 race 相关的内容。
在介绍具体的工具前,我们需要先意识到一个问题,就是 golang 通过提供协程来向使用者屏蔽了操作系统层面的进程和线程,所以标准库中提供的各种工具也是用于做协程间同步的,这也就少不了与运行时的相互配合,这里具体来说就是信号量,事实上这篇文章所介绍的三个工具中都用到了信号量来做协程的阻塞与唤醒。我们这里不对信号量的原理做解释,只需要了解在 golang 中它能够提供的功能即可。
2. Mutex 2.1. 基本原理 Mutex 对外仅提供 Lock 和 Unlock 两个方法,并保证同一时间仅能有一个协程从 Lock 方法返回,其他与之竞争的协程在 Unlock 方法被调用前会阻塞在 Lock 方法中。但 golang 并没有限制调用 Unlock 方法的协程一定是成功从 Lock 方法返回的协程,也就是说幸运的协程 a 通过 Lock 获取到锁后,另一个协程 b 也可以调用 Unlock 来解除 a 对锁的占用。
Mutex 核心依赖 atomic 提供的原子操作以及前文提到的 runtime 提供的信号量,通过这两个工具的相互配合来对外提供便捷的方法调用。在加锁时,未能成功获取到锁的协程可能会尝试进行自旋,自旋会为它提供更多的机会来成功获取这个锁。当自旋失败时,协程就会通过信号量阻塞起来,直到锁被释放时才有机会被唤醒继续尝试加锁。由于同一时间只有一个协程能获取到锁,所以阻塞的协程会相对更多。为了避免阻塞的协程长时间获取不到锁导致的饥饿现象,golang 为 Mutex 加入了“饥饿模式”,在饥饿模式下已经阻塞过的协程会优先于新来的协程获取锁。
2.2. 核心数据结构 Mutex 的核心结构自然就是 Mutex 结构体,这个结构体的构成非常简单,具体如下:
1 2 3 4 type Mutex struct { state int32 sema uint32 }
其中 sema 非常简单,仅仅用于作为信号量来实现协程的阻塞与唤醒;而 state 用于记录 Mutex 当前的状态,右边第一位代表当前 Mutex 是否已上锁,第二位代表当前是否有协程被唤醒,第三位代表当前 Mutex 是否处于“饥饿模式”,从第四位开始到最左边共计 29 位用于记录当前有多少个协程被阻塞。为了方便操作,标准库提供了一些常量来记录这些信息,具体如下:
1 2 3 4 5 6 const ( mutexLocked = 1 << iota mutexWoken mutexStarving mutexWaiterShift = iota )
通过上面的这段描述我们就可以发现,一个零值的 Mutex 就是一个有效的初态的锁,所以我们可以简单通过 Mutex{}
或 &Mutex{}
来初始化一个锁,标准库也就没有对外提供类似 NewMutex 这样的函数。
2.3. 加锁过程 Mutex 的加锁被分成了两个函数:作为主体对外暴露的 Lock 方法和 Lock 内部可能调用的 lockSlow 方法。其中,Lock 方法的定义非常简单,具体如下:
1 2 3 4 5 6 7 8 func (m *Mutex) Lock () { if atomic.CompareAndSwapInt32(&m.state, 0 , mutexLocked) { return } m.lockSlow() }
可以看到 Lock 方法仅仅就是通过 CAS 来尝试将 state 的最低位置一,对于一个初态的锁而言这一个简单的操作就足够了,所以这被称为 fast-path。但事实上锁竞争是不可避免的,否则就不需要使用 Mutex 了,对于这类情况,流程会进入到 lockSlow 方法中。
其实这种 fast-path + slow-path 的编码方式在 golang 标准库的好多地方都可以看到,这样写的一个好处在于外层的 Lock 方法足够简单以至于可以被内联到调用处,而一旦形成内联,那么 fast-path 就真的非常快了。
回到 lockSlow,这个函数应该说是 Mutex 的核心所在了。我们直接给出这个函数的流程注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 func (m *Mutex) lockSlow () { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new &mutexWoken == 0 { throw("sync: inconsistent mutex state" ) } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new ) { if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1 ) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state" ) } delta := int32 (mutexLocked - 1 <<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } }
2.4. 解锁过程 解锁过程同加锁过程一样,也是由 Unlock 和 unlockSlow 来配合实现的,但解锁相较于加锁要简单很多:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func (m *Mutex) Unlock () { new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new ) } } func (m *Mutex) unlockSlow (new int32 ) { if (new +mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex" ) } if new &mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1 <<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new ) { runtime_Semrelease(&m.sema, false , 1 ) return } old = m.state } } else { runtime_Semrelease(&m.sema, true , 1 ) } }
3. RWMutex 3.1. 基本原理 和 Mutex 不同,RWMutex 将加锁解锁这一操作细分成读锁和写锁,在使用效果上,同一时间可以有多个协程持有读锁,但仅会有一个协程持有写锁。但正是由于这一点,所以读锁相对写锁而言更容易获取,因为两次读锁之间不会有锁冲突,这很可能导致试图加写锁的协程饥饿,也就是长时间获取不到写锁。
为了避免这种现象,RWMutex 会保证当一个协程尝试获取写锁后,尽管它可能会失败,但在其之后尝试获取读锁的协程都没办法成功获取到锁,直至已经持有读锁的协程释放了锁,前面尝试获取写锁的协程拿到并释放锁后,这些新来的协程才能继续获取读锁。
3.2. 核心数据结构 RWMutex 的核心数据结构也很简单,并且同 Mutex 一样,一个零值的结构体就是一个初态的读写锁,结构体的具体字段如下:
1 2 3 4 5 6 7 type RWMutex struct { w Mutex writerSem uint32 readerSem uint32 readerCount int32 readerWait int32 }
除了这个结构体外,还有一个很重要的常量定义 const rwmutexMaxReaders = 1 << 30
,这个东西是用来与 readerCount 字段配合使用的,具体而言,我们前面提到当一个协程尝试获取写锁但失败时,它会陷入阻塞,但在它之后的所有尝试获取读锁的协程都会阻塞。为了达成这个效果,我们就需要在尝试获取写锁时,在 RWMutex 加一个标记表示有协程在尝试获取写锁,这个标记在实现上是通过 readerCount 与 rwmutexMaxReaders 来相互配合的。当协程尝试加写锁时,会将 readerCount 减掉 rwmutexMaxReaders,这会导致 readerCount 变成一个很小的负数,这样在协程尝试加读锁时,一旦检测到这个值是负数,那么它就知道现在有协程在尝试获取写锁,此时它就应该乖乖地降低自己的优先级。
3.3. 写锁加锁过程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (rw *RWMutex) Lock () { rw.w.Lock() r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false , 0 ) } }
3.4. 写锁解锁过程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (rw *RWMutex) Unlock () { r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { throw("sync: Unlock of unlocked RWMutex" ) } for i := 0 ; i < int (r); i++ { runtime_Semrelease(&rw.readerSem, false , 0 ) } rw.w.Unlock() }
3.5. 读锁加锁过程 1 2 3 4 5 6 7 8 func (rw *RWMutex) RLock () { if atomic.AddInt32(&rw.readerCount, 1 ) < 0 { runtime_SemacquireMutex(&rw.readerSem, false , 0 ) } }
3.6. 读锁解锁过程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (rw *RWMutex) RUnlock () { if r := atomic.AddInt32(&rw.readerCount, -1 ); r < 0 { rw.rUnlockSlow(r) } } func (rw *RWMutex) rUnlockSlow (r int32 ) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { throw("sync: RUnlock of unlocked RWMutex" ) } if atomic.AddInt32(&rw.readerWait, -1 ) == 0 { runtime_Semrelease(&rw.writerSem, false , 1 ) } }
4. WaitGroup 4.1. 基本原理 WaitGroup 适合某个/些协程“等待”另一个/些协程的场景,比如一个主协程分出 n 个子协程去调不同的 rpc,然后 Wait 这些协程直到它们全部返回,再收集这些调用结果;亦或是一个主协程去做一些事,多个子协程在 Wait 它把这件事做完。总而言之,Add/Done 被那些“做某件事”的协程调用,Wait 则被那个“等待结果”的协程调用。
但 Add 方法并不限制传入的值一定是 1,所以 WaitGroup 的使用方式非常灵活。这就要求我们需要记录当前某个 WaitGroup Add 了多少,相应的需要调用同样多次 Done 才能让调用 Wait 的协程被唤醒。而调用 Wait 的协程又可能不止一个,所以我们还需要记录这个数量,在 Done 的调用次数达标时,这些协程都应该被唤醒。
4.2. 核心数据结构 WaitGroup 的核心数据结构虽然定义很简单,但理解起来却有一定难度,和前两个结构体一样,一个零值的结构体就是一个初态的 WaitGroup,这个结构体的定义如下:
1 2 3 4 type WaitGroup struct { noCopy noCopy state1 [3 ]uint32 }
首先是 noCopy 这个结构,顾名思义,它用于检查 WaitGroup 被创建后是否被复制,一旦某个 WaitGroup 被复制,go vet
工具就可以将这个复制操作检查出来。
然后是 state1 这个结构,它最终是通过 WaitGroup.state 这个方法来使用的,这个方法的定义是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 func (wg *WaitGroup) state () (statep *uint64 , semap *uint32 ) { if uintptr (unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64 )(unsafe.Pointer(&wg.state1)), &wg.state1[2 ] } else { return (*uint64 )(unsafe.Pointer(&wg.state1[1 ])), &wg.state1[0 ] } }
可以看到,state1 最终会通过 state 方法拆解出 state 和 sema 两个值,前者用于记录 WaitGroup 的状态,后者作为信号量来实现协程的阻塞与唤醒。和 Mutex 不同,这里的 state 需要是 64 位的,因为它前 32 位记录 Add 的总量,后 32 位记录 Wait 的协程数量,如果仍然使用 32 位的 state,那么这两个内容的最大值都会很受限。
而一旦 state 是 64 位的,那我们就需要通过 64 位的原子操作来操作这个字段,但 64 位的原子操作要求被操作数的地址是 64 位对齐的,也就是首地址模 8 需要为 0。64 位系统的编译器会保证这一点,但 32 位系统的编译器却只保证操作数地址是 32 位对齐的,也就是说可能出现操作数首地址模 8 后等于 4 的情况。所以为了在这样的情况下仍然能够原子性地操作 uint64,就需要手动进行内存对齐,也就是跳过 state1 的前 32 位。
4.3. Add/Done 过程 WaitGroup.Done 实际上就是 WaitGroup.Add(-1),所以我们这里仅给出 Add 方法的代码解读:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (wg *WaitGroup) Add (delta int ) { statep, semap := wg.state() state := atomic.AddUint64(statep, uint64 (delta)<<32 ) v := int32 (state >> 32 ) w := uint32 (state) if v < 0 { panic ("sync: negative WaitGroup counter" ) } if w != 0 && delta > 0 && v == int32 (delta) { panic ("sync: WaitGroup misuse: Add called concurrently with Wait" ) } if v > 0 || w == 0 { return } if *statep != state { panic ("sync: WaitGroup misuse: Add called concurrently with Wait" ) } *statep = 0 for ; w != 0 ; w-- { runtime_Semrelease(semap, false , 0 ) } }
4.4. Wait 过程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (wg *WaitGroup) Wait () { statep, semap := wg.state() for { state := atomic.LoadUint64(statep) v := int32 (state >> 32 ) w := uint32 (state) if v == 0 { return } if atomic.CompareAndSwapUint64(statep, state, state+1 ) { runtime_Semacquire(semap) if *statep != 0 { panic ("sync: WaitGroup is reused before previous Wait has returned" ) } return } } }