Go 语言在 sync
包中提供了用于同步的一些基本原语,sync.Mutex
就是其中最常用的一个。
Go 语言的 sync.Mutex
由两个字段 state
和 sema
组成。其中 state
表示当前互斥锁的状态,而 sema
是用于控制锁状态的信号量。
// sync/mutex.go 25行
type Mutex struct {
state int32
sema uint32
}
上述两个字段加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁。
互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked
、mutexWoken
和 mutexStarving
,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放:
int32 中的不同位分别表示了不同的状态:
mutexLocked
— 表示互斥锁的锁定状态;mutexWoken
— 表示从正常模式被唤醒;mutexStarving
— 当前的互斥锁进入饥饿状态;waitersCount
— 当前互斥锁上等待的 Goroutine 个数在默认情况下,互斥锁的所有状态位都是 0,即默认为未锁定状态。同时也表明 Mutex
是不需要初始化的。
源码中也提供了相关常量:
// sync/mutex.go 36
const (
mutexLocked = 1 << iota // 1 0001 含义:用最后一位表示当前对象锁的状态,0-未锁住 1-已锁住
mutexWoken // 2 0010 含义:用倒数第二位表示当前对象是否被唤醒 0-唤醒 1-未唤醒
mutexStarving // 4 0100 含义:用倒数第三位表示当前对象是否为饥饿模式,0为正常模式,1为饥饿模式。
mutexWaiterShift = iota // 3,从倒数第四位往前的 bit 位表示在排队等待的 goroutine 数
starvationThresholdNs = 1e6 // 1ms 切换到饥饿模式的阈值
)
Mutex
有两种模式:
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被饿死。
引入饥饿模式的目的是保证互斥锁的公平性。
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。
与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。
在 sync
包中定义了 Locker
接口:
// sync/mutex.go 31 行
type Locker interface {
Lock()
Unlock()
}
Mutex
实现了 Locker
接口。除了互斥锁 Mutex
之外,读写锁 RWMutex
也实现了 Locker
接口。
互斥锁的加锁是靠 Mutex.Lock
方法完成的,以下代码进行了简化,省略了 race 相关代码,只保留主干部分:
// sync/mutex.go 72 行
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
可以看到,整个加锁过程分为 Fast path
和 Slow Path
。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
如果 m.state
为 0,说明当前锁为未锁定状态,将其设置为 1。这也是最简单的部分,直接通过一个 CAS 操作,尝试获取锁。
如果互斥锁的状态不是 0 时就会进入 Slow Path
。尝试通过自旋(Spining)等方式等待锁的释放,该方法的主体是一个非常大的 for 循环,这里将它分成几个部分介绍获取锁的过程:
1. 判断当前 Goroutine 能否进入自旋:
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:
runtime.sync_runtime_canSpin
需要返回 true:
P
并且处理的运行队列为空;// runtime/proc.go 6364 行
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
2. 通过自旋等待互斥锁的释放:
一旦当前 Goroutine 能够进入自旋就会调用 runtime.sync_runtime_doSpin
和 runtime.procyield
执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:
// runtime/proc.go 6381 行
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
// runtime/asm_386.s 574 行
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
3. 计算互斥锁的最新状态:
处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新 state
字段中存储的不同信息 — mutexLocked
、mutexStarving
、mutexWoken
和 mutexWaiterShift
:
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
4. 更新互斥锁的状态并获取锁:
计算了新的互斥锁状态之后,会使用 CAS 函数更新状态:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
如果没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex
通过信号量保证资源不会被两个 Goroutine 获取。
runtime.sync_runtime_SemacquireMutex
会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,sync.Mutex.Lock
的剩余代码也会继续执行。
其中还包含了状态切换的部分逻辑:
var waitStartTime int64
for { // for 循环里尝试获取锁
if waitStartTime == 0 { // waitStartTime 只有第一次执行时才会赋值
waitStartTime = runtime_nanotime()
}
// 如果等待时间超过 1ms 则切换到饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
}
相比之下互斥锁的解锁过程就比较简单,同样分为 Fast path
和 Slow path
。
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
该过程会先使用 atomic.AddInt32
函数快速解锁,这时会发生下面的两种情况:
Slow path
。func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
先校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了会直接抛出异常 “sync: unlock of unlocked mutex” 中止当前程序。
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
然后根据当前锁模式分别处理:
if new&mutexStarving == 0 { // 正常模式
} else { // 饥饿模式
}
在正常模式下,上述代码会使用如下所示的处理过程:
mutexLocked
、mutexStarving
、mutexWoken
状态不都为 0,那么当前方法可以直接返回,不需要唤醒其他等待者;runtime_Semrelease
唤醒等待者并移交锁的所有权;在饥饿模式下,上述代码会直接调用 runtime_Semrelease
将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态:
// runtime/sema.go 65行
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semroot(addr)
atomic.Xadd(addr, 1)
if atomic.Load(&root.nwait) == 0 {
return
}
// Harder case: search for a waiter and wake it.
lockWithRank(&root.lock, lockRankRoot)
if atomic.Load(&root.nwait) == 0 {
unlock(&root.lock)
return
}
s, t0 := root.dequeue(addr)
if s != nil {
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
if s != nil { // May be slow or even yield, so unlock first
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
readyWithTime(s, 5+skipframes)
if s.ticket == 1 && getg().m.locks == 0 {
goyield()
}
}
}
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
mutexLocked
加锁;mutexLocked
状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;sync_runtime_SemacquireMutex
将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;互斥锁的解锁过程与之相比就比较简单:
Mutex.Lock
会直接抛出异常;mutexLocked
标志位;sync.runtime_Semrelease
如果您喜欢我的文章,请点击下面按钮随意打赏,您的支持是我最大的动力。
最新评论