编辑
2023-12-11
后端
00
请注意,本文编写于 516 天前,最后修改于 516 天前,其中某些信息可能已经过时。

在go里面,已经把线程概念抽象完了,几乎没有提供关于线程的操作,在go程序,所有操作都是协程粒度的以前用cpp写的简单协程库,那时候在考虑协程锁怎么实现,如果直接用线程锁的话,那跑在这一个线程上的协程都动不了,所以我们需要一个小粒度的,作用在协程的锁,go是一门只有协程的语言,线程只能由runtime来管理,所以他的锁必然是协程粒度

这段代码是 Go 语言标准库中 sync 包中的 Mutex 类型的实现。

Mutex 结构体定义:

go
type Mutex struct { state int32 sema uint32 }
  • state 用于表示互斥锁的状态,其中 mutexLocked 标志表示锁已被某个 goroutine 锁定。
  • sema 是一个信号量,用于在等待锁的 goroutine 之间进行通信。

锁状态

go
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota // Mutex fairness. // // Mutex can be in 2 modes of operations: normal and starvation. // In normal mode waiters are queued in FIFO order, but a woken up waiter // does not own the mutex and competes with new arriving goroutines over // the ownership. New arriving goroutines have an advantage -- they are // already running on CPU and there can be lots of them, so a woken up // waiter has good chances of losing. In such case it is queued at front // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, // it switches mutex to the starvation mode. // // In starvation mode ownership of the mutex is directly handed off from // the unlocking goroutine to the waiter at the front of the queue. // New arriving goroutines don't try to acquire the mutex even if it appears // to be unlocked, and don't try to spin. Instead they queue themselves at // the tail of the wait queue. // // If a waiter receives ownership of the mutex and sees that either // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, // it switches mutex back to normal operation mode. // // Normal mode has considerably better performance as a goroutine can acquire // a mutex several times in a row even if there are blocked waiters. // Starvation mode is important to prevent pathological cases of tail latency. starvationThresholdNs = 1e6 )

Lock 方法:

go
func (m *Mutex) Lock() { // Fast path: 尝试直接获取锁 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path: 锁已被其他 goroutine 占用,调用 lockSlow 进行处理 m.lockSlow() } func (m *Mutex) lockSlow() { // 记录等待开始的时间,用于判断是否进入饥饿模式 var waitStartTime int64 // 标记是否处于饥饿模式 starving := false // 标记是否被唤醒 awoke := false // 记录自旋次数 iter := 0 // 获取当前锁的状态 old := m.state for { // 如果锁处于饥饿模式且正在自旋,则不进行自旋,因为锁的所有权已经交给了等待者 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 主动自旋有意义 // 尝试设置 mutexWoken 标志,以通知 Unlock 不要唤醒其他阻塞的 goroutines if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // 如果锁不处于饥饿模式,设置新的状态为已锁定 if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 当前 goroutine 将锁切换到饥饿模式 // 但是如果锁当前处于未锁定状态,则不切换 // Unlock 预期处于饥饿模式的锁有等待者,而在这种情况下不为真 if starving && old&mutexLocked != 0 { new |= mutexStarving } // 如果 goroutine 被唤醒,需要重置标志位 if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } // 尝试用 CAS 更新锁的状态 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果锁之前未锁定,则成功获取锁并退出循环 if old&(mutexLocked|mutexStarving) == 0 { break // 用 CAS 锁住了 mutex } // 如果之前已经在等待了,则排队在队列的前面 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 等待获取锁 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 判断是否需要进入饥饿模式 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 如果锁处于饥饿模式,则进行相应的处理 if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 修复锁的状态,解决饥饿模式下的不一致性 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // 退出饥饿模式 delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } // 记录 goroutine 获取锁的操作,用于竞争检测 if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
  • Lock 方法用于获取锁。如果锁已被其他 goroutine 占用,就调用 lockSlow 方法进行处理。
  1. TryLock 方法:
go
func (m *Mutex) TryLock() bool { old := m.state if old&(mutexLocked|mutexStarving) != 0 { return false } if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) { return false } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return true }
  • TryLock 方法尝试获取锁,成功返回 true,失败返回 false。如果锁已被占用或处于饥饿模式,获取锁失败。
  1. Unlock 方法:
go
func (m *Mutex) Unlock() { // 快速路径:释放锁 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // 慢速路径:调用 unlockSlow 处理 m.unlockSlow(new) } }
  • Unlock 方法用于释放锁。如果锁的状态为零,表示成功释放锁。否则,调用 unlockSlow 方法进行处理。
  1. unlockSlow 方法:
go
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { // 非饥饿模式:唤醒等待的 goroutine old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 饥饿模式:将锁的所有权移交给等待的下一个 goroutine,并让出时间片 runtime_Semrelease(&m.sema, true, 1) } }
  • unlockSlow 方法处理释放锁的慢速路径。在非饥饿模式下,它尝试唤醒等待的 goroutine。在饥饿模式下,它直接将锁的所有权移交给等待的下一个 goroutine,并让出时间片。

这段代码实现了互斥锁的基本功能,并考虑了锁的饥饿模式。饥饿模式是一种优化,用于防止某些情况下的锁竞争。


参考 1.https://mp.weixin.qq.com/s/QLzJ4sXaggPga2biQdExOA 2.https://mp.weixin.qq.com/s/D9Zgh2pm6hqqbIOkefrNcw

本文作者:yowayimono

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!