在go里面,已经把线程概念抽象完了,几乎没有提供关于线程的操作,在go程序,所有操作都是协程粒度的以前用cpp写的简单协程库,那时候在考虑协程锁怎么实现,如果直接用线程锁的话,那跑在这一个线程上的协程都动不了,所以我们需要一个小粒度的,作用在协程的锁,go是一门只有协程的语言,线程只能由runtime来管理,所以他的锁必然是协程粒度
这段代码是 Go 语言标准库中 sync
包中的 Mutex
类型的实现。
Mutex
结构体定义:gotype Mutex struct {
state int32
sema uint32
}
state
用于表示互斥锁的状态,其中 mutexLocked
标志表示锁已被某个 goroutine 锁定。sema
是一个信号量,用于在等待锁的 goroutine 之间进行通信。goconst (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
starvationThresholdNs = 1e6
)
Lock
方法:gofunc (m *Mutex) Lock() {
// Fast path: 尝试直接获取锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path: 锁已被其他 goroutine 占用,调用 lockSlow 进行处理
m.lockSlow()
}
func (m *Mutex) lockSlow() {
// 记录等待开始的时间,用于判断是否进入饥饿模式
var waitStartTime int64
// 标记是否处于饥饿模式
starving := false
// 标记是否被唤醒
awoke := false
// 记录自旋次数
iter := 0
// 获取当前锁的状态
old := m.state
for {
// 如果锁处于饥饿模式且正在自旋,则不进行自旋,因为锁的所有权已经交给了等待者
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 主动自旋有意义
// 尝试设置 mutexWoken 标志,以通知 Unlock 不要唤醒其他阻塞的 goroutines
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// 如果锁不处于饥饿模式,设置新的状态为已锁定
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 当前 goroutine 将锁切换到饥饿模式
// 但是如果锁当前处于未锁定状态,则不切换
// Unlock 预期处于饥饿模式的锁有等待者,而在这种情况下不为真
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果 goroutine 被唤醒,需要重置标志位
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 尝试用 CAS 更新锁的状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果锁之前未锁定,则成功获取锁并退出循环
if old&(mutexLocked|mutexStarving) == 0 {
break // 用 CAS 锁住了 mutex
}
// 如果之前已经在等待了,则排队在队列的前面
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 等待获取锁
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 判断是否需要进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果锁处于饥饿模式,则进行相应的处理
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 修复锁的状态,解决饥饿模式下的不一致性
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
// 记录 goroutine 获取锁的操作,用于竞争检测
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
Lock
方法用于获取锁。如果锁已被其他 goroutine 占用,就调用 lockSlow
方法进行处理。TryLock
方法:gofunc (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return true
}
TryLock
方法尝试获取锁,成功返回 true
,失败返回 false
。如果锁已被占用或处于饥饿模式,获取锁失败。Unlock
方法:gofunc (m *Mutex) Unlock() {
// 快速路径:释放锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 慢速路径:调用 unlockSlow 处理
m.unlockSlow(new)
}
}
Unlock
方法用于释放锁。如果锁的状态为零,表示成功释放锁。否则,调用 unlockSlow
方法进行处理。unlockSlow
方法:gofunc (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
// 非饥饿模式:唤醒等待的 goroutine
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式:将锁的所有权移交给等待的下一个 goroutine,并让出时间片
runtime_Semrelease(&m.sema, true, 1)
}
}
unlockSlow
方法处理释放锁的慢速路径。在非饥饿模式下,它尝试唤醒等待的 goroutine。在饥饿模式下,它直接将锁的所有权移交给等待的下一个 goroutine,并让出时间片。这段代码实现了互斥锁的基本功能,并考虑了锁的饥饿模式。饥饿模式是一种优化,用于防止某些情况下的锁竞争。
参考 1.https://mp.weixin.qq.com/s/QLzJ4sXaggPga2biQdExOA 2.https://mp.weixin.qq.com/s/D9Zgh2pm6hqqbIOkefrNcw
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!