Golang sync.Mutex源码解析

Go 语言在 sync 包中提供了用于同步的一些基本原语,sync.Mutex 就是其中最常用的一个。

1. 基本结构

Go 语言的 sync.Mutex 由两个字段 statesema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。

// sync/mutex.go 25行
type Mutex struct {
    state int32
    sema  uint32
}

上述两个字段加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁。

状态

互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLockedmutexWokenmutexStarving,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放:

int32 中的不同位分别表示了不同的状态:

  • mutexLocked — 表示互斥锁的锁定状态;
  • mutexWoken — 表示从正常模式被唤醒;
  • mutexStarving — 当前的互斥锁进入饥饿状态;
  • waitersCount — 当前互斥锁上等待的 Goroutine 个数

在默认情况下,互斥锁的所有状态位都是 0,即默认为未锁定状态。同时也表明 Mutex 是不需要初始化的。

源码中也提供了相关常量:

// sync/mutex.go 36
const (
    mutexLocked = 1 << iota // 1 0001 含义:用最后一位表示当前对象锁的状态,0-未锁住 1-已锁住
    mutexWoken              // 2 0010 含义:用倒数第二位表示当前对象是否被唤醒 0-唤醒 1-未唤醒
    mutexStarving           // 4 0100 含义:用倒数第三位表示当前对象是否为饥饿模式,0为正常模式,1为饥饿模式。
    mutexWaiterShift = iota // 3,从倒数第四位往前的 bit 位表示在排队等待的 goroutine 数
    starvationThresholdNs = 1e6 // 1ms 切换到饥饿模式的阈值
)

正常模式和饥饿模式

Mutex 有两种模式:

  1. 正常模式;
  2. 饥饿模式。

在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被饿死。

引入饥饿模式的目的是保证互斥锁的公平性。

在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。

2. 加解锁过程

sync 包中定义了 Locker 接口:

// sync/mutex.go 31 行
type Locker interface {
    Lock()
    Unlock()
}

Mutex 实现了 Locker 接口。除了互斥锁 Mutex 之外,读写锁 RWMutex 也实现了 Locker 接口。

Lock

互斥锁的加锁是靠 Mutex.Lock 方法完成的,以下代码进行了简化,省略了 race 相关代码,只保留主干部分:

// sync/mutex.go 72 行
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

可以看到,整个加锁过程分为 Fast pathSlow Path

Fast path

if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    return
}

如果 m.state 为 0,说明当前锁为未锁定状态,将其设置为 1。这也是最简单的部分,直接通过一个 CAS 操作,尝试获取锁。

Slow Path

如果互斥锁的状态不是 0 时就会进入 Slow Path。尝试通过自旋(Spining)等方式等待锁的释放,该方法的主体是一个非常大的 for 循环,这里将它分成几个部分介绍获取锁的过程:

  1. 判断当前 Goroutine 能否进入自旋;
  2. 通过自旋等待互斥锁的释放;
  3. 计算互斥锁的最新状态;
  4. 更新互斥锁的状态并获取锁;

1. 判断当前 Goroutine 能否进入自旋:

自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:

  1. 互斥锁只有在普通模式才能进入自旋;
  2. runtime.sync_runtime_canSpin 需要返回 true:
    • 运行在多 CPU 的机器上;
    • 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
    • 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
// runtime/proc.go 6364 行
func sync_runtime_canSpin(i int) bool {
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

2. 通过自旋等待互斥锁的释放:

一旦当前 Goroutine 能够进入自旋就会调用 runtime.sync_runtime_doSpinruntime.procyield 执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:

// runtime/proc.go 6381 行
func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}

// runtime/asm_386.s 574 行
TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ     again
    RET

3. 计算互斥锁的最新状态:

处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新 state 字段中存储的不同信息 — mutexLockedmutexStarvingmutexWokenmutexWaiterShift

new := old
if old&mutexStarving == 0 {
    new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
    new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
    new |= mutexStarving
}
if awoke {
    if new&mutexWoken == 0 {
        throw("sync: inconsistent mutex state")
    }
    new &^= mutexWoken
}

4. 更新互斥锁的状态并获取锁:

计算了新的互斥锁状态之后,会使用 CAS 函数更新状态:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
    if old&(mutexLocked|mutexStarving) == 0 {
        break // locked the mutex with CAS
    }
    queueLifo := waitStartTime != 0
    if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
    }
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
    if old&mutexStarving != 0 {
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
        }
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        if !starving || old>>mutexWaiterShift == 1 {
            delta -= mutexStarving
        }
        atomic.AddInt32(&m.state, delta)
        break
    }
    awoke = true
    iter = 0
} else {
    old = m.state
}

如果没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex 通过信号量保证资源不会被两个 Goroutine 获取。

runtime.sync_runtime_SemacquireMutex 会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,sync.Mutex.Lock 的剩余代码也会继续执行。

  • 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
  • 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;

其中还包含了状态切换的部分逻辑:

var waitStartTime int64
for { // for 循环里尝试获取锁
    if waitStartTime == 0 { // waitStartTime 只有第一次执行时才会赋值
        waitStartTime = runtime_nanotime()
    }
    // 如果等待时间超过 1ms 则切换到饥饿模式
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
}

Unlock

相比之下互斥锁的解锁过程就比较简单,同样分为 Fast pathSlow path

func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

Fast path

该过程会先使用 atomic.AddInt32 函数快速解锁,这时会发生下面的两种情况:

  • 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
  • 如果该函数返回的新状态不等于 0,则进入 Slow path

Slow path

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 1)
    }
}

先校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了会直接抛出异常 “sync: unlock of unlocked mutex” 中止当前程序。

if (new+mutexLocked)&mutexLocked == 0 {
    throw("sync: unlock of unlocked mutex")
}

然后根据当前锁模式分别处理:

if new&mutexStarving == 0 { // 正常模式
    
} else { // 饥饿模式
    
}

在正常模式下,上述代码会使用如下所示的处理过程:

  • 如果互斥锁不存在等待者或者互斥锁的 mutexLockedmutexStarvingmutexWoken 状态不都为 0,那么当前方法可以直接返回,不需要唤醒其他等待者;
  • 如果互斥锁存在等待者,会通过 runtime_Semrelease 唤醒等待者并移交锁的所有权;

在饥饿模式下,上述代码会直接调用 runtime_Semrelease 将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态:

// runtime/sema.go 65行
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
    semrelease1(addr, handoff, skipframes)
}

func semrelease1(addr *uint32, handoff bool, skipframes int) {
    root := semroot(addr)
    atomic.Xadd(addr, 1)

    if atomic.Load(&root.nwait) == 0 {
        return
    }

    // Harder case: search for a waiter and wake it.
    lockWithRank(&root.lock, lockRankRoot)
    if atomic.Load(&root.nwait) == 0 {
        unlock(&root.lock)
        return
    }
    s, t0 := root.dequeue(addr)
    if s != nil {
        atomic.Xadd(&root.nwait, -1)
    }
    unlock(&root.lock)
    if s != nil { // May be slow or even yield, so unlock first
        acquiretime := s.acquiretime
        if acquiretime != 0 {
            mutexevent(t0-acquiretime, 3+skipframes)
        }
        if s.ticket != 0 {
            throw("corrupted semaphore ticket")
        }
        if handoff && cansemacquire(addr) {
            s.ticket = 1
        }
        readyWithTime(s, 5+skipframes)
        if s.ticket == 1 && getg().m.locks == 0 {
            goyield()
        }
    }
}

3. 小结

互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:

  • 如果互斥锁处于初始化状态,会通过置位 mutexLocked 加锁;
  • 如果互斥锁处于 mutexLocked 状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
  • 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
  • 互斥锁在正常情况下会通过 sync_runtime_SemacquireMutex 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;
  • 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;

互斥锁的解锁过程与之相比就比较简单:

  • 当互斥锁已经被解锁时,调用 Mutex.Lock 会直接抛出异常;
  • 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过 sync.runtime_Semrelease

打 赏