Go 语言并发编程、同步原语与锁 您所在的位置:网站首页 golang并发编程停止所有 Go 语言并发编程、同步原语与锁

Go 语言并发编程、同步原语与锁

#Go 语言并发编程、同步原语与锁 | 来源: 网络整理| 查看: 265

Go 语言并发编程、同步原语与锁 6.2 同步原语与锁 #

各位读者朋友,很高兴大家通过本博客学习 Go 语言,感谢一路相伴!《Go语言设计与实现》的纸质版图书已经上架京东,有需要的朋友请点击 链接 购买。

Go 语言作为一个原生支持用户态进程(Goroutine)的语言,当提到并发编程、多线程编程时,往往都离不开锁这一概念。锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一片内存时不会出现竞争条件(Race condition)等问题。

本节会介绍 Go 语言中常见的同步原语 sync.Mutex、sync.RWMutex、sync.WaitGroup、sync.Once 和 sync.Cond 以及扩展原语 golang/sync/errgroup.Group、golang/sync/semaphore.Weighted 和 golang/sync/singleflight.Group 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。

6.2.1 基本原语 #

Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的 sync.Mutex、sync.RWMutex、sync.WaitGroup、sync.Once 和 sync.Cond:

golang-basic-sync-primitives

图 6-5 基本同步原语

这些基本原语提供了较为基础的同步功能,但是它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级更高的 Channel 实现同步。

Mutex #

Go 语言的 sync.Mutex 由两个字段 state 和 sema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。

type Mutex struct { state int32 sema uint32 }

上述两个加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁。

状态 #

互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放:

golang-mutex-state

图 6-6 互斥锁的状态

在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

mutexLocked — 表示互斥锁的锁定状态;mutexWoken — 表示从正常模式被从唤醒;mutexStarving — 当前的互斥锁进入饥饿状态;waitersCount — 当前互斥锁上等待的 Goroutine 个数;正常模式和饥饿模式 #

sync.Mutex 有两种模式 — 正常模式和饥饿模式。我们需要在这里先了解正常模式和饥饿模式都是什么以及它们有什么样的关系。

在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。

golang-mutex-mode

图 6-7 互斥锁的正常模式与饥饿模式

饥饿模式是在 Go 语言在 1.9 中通过提交 sync: make Mutex more fair 引入的优化1,引入的目的是保证互斥锁的公平性。

在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。

加锁和解锁 #

我们在这一节中将分别介绍互斥锁的加锁和解锁过程,它们分别使用 sync.Mutex.Lock 和 sync.Mutex.Unlock 方法。

互斥锁的加锁是靠 sync.Mutex.Lock 完成的,最新的 Go 语言源代码中已经将 sync.Mutex.Lock 方法进行了简化,方法的主干只保留最常见、简单的情况 — 当锁的状态是 0 时,将 mutexLocked 位置成 1:

func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } m.lockSlow() }

如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow 尝试通过自旋(Spinnig)等方式等待锁的释放,该方法的主体是一个非常大 for 循环,这里将它分成几个部分介绍获取锁的过程:

判断当前 Goroutine 能否进入自旋;通过自旋等待互斥锁的释放;计算互斥锁的最新状态;更新互斥锁的状态并获取锁;

我们先来介绍互斥锁是如何判断当前 Goroutine 能否进入自旋等互斥锁的释放:

func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue }

自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:

互斥锁只有在普通模式才能进入自旋;runtime.sync_runtime_canSpin 需要返回 true:运行在多 CPU 的机器上;当前 Goroutine 为了获取该锁进入自旋的次数小于四次;当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;

一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin 和 runtime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:

func sync_runtime_doSpin() { procyield(active_spin_cnt) } TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET

处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新 state 字段中存储的不同信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:

new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 >mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1 0 || w == 0 { return } *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }

sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的计数器 counter。虽然 sync.WaitGroup.Add 方法传入的参数可以为负数,但是计数器只能是非负数,一旦出现负数就会发生程序崩溃。当调用计数器归零,即所有任务都执行完成时,才会通过 sync.runtime_Semrelease 唤醒处于等待状态的 Goroutine。

sync.WaitGroup 的另一个方法 sync.WaitGroup.Wait 会在计数器大于 0 并且不存在等待的 Goroutine 时,调用 runtime.sync_runtime_Semacquire 陷入睡眠。

func (wg *WaitGroup) Wait() { statep, semap := wg.state() for { state := atomic.LoadUint64(statep) v := int32(state >> 32) if v == 0 { return } if atomic.CompareAndSwapUint64(statep, state, state+1) { runtime_Semacquire(semap) if +statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }

当 sync.WaitGroup 的计数器归零时,陷入睡眠状态的 Goroutine 会被唤醒,上述方法也会立刻返回。

小结 #

通过对 sync.WaitGroup 的分析和研究,我们能够得出以下结论:

sync.WaitGroup 必须在 sync.WaitGroup.Wait 方法返回之后才能被重新使用;sync.WaitGroup.Done 只是对 sync.WaitGroup.Add 方法的简单封装,我们可以向 sync.WaitGroup.Add 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine;可以同时有多个 Goroutine 等待当前 sync.WaitGroup 计数器的归零,这些 Goroutine 会被同时唤醒;Once #

Go 语言标准库中 sync.Once 可以保证在 Go 程序运行期间的某段代码只会执行一次。在运行如下所示的代码时,我们会看到如下所示的运行结果:

func main() { o := &sync.Once{} for i := 0; i < 10; i++ { o.Do(func() { fmt.Println("only once") }) } } $ go run main.go only once 结构体 #

每一个 sync.Once 结构体中都只包含一个用于标识代码块是否执行过的 done 以及一个互斥锁 sync.Mutex:

type Once struct { done uint32 m Mutex } 接口 #

sync.Once.Do 是 sync.Once 结构体对外唯一暴露的方法,该方法会接收一个入参为空的函数:

如果传入的函数已经执行过,会直接返回;如果传入的函数没有执行过,会调用 sync.Once.doSlow 执行传入的函数:func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 0 { o.doSlow(f) } } func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } } 为当前 Goroutine 获取互斥锁;执行传入的无入参函数;运行延迟函数调用,将成员变量 done 更新成 1;

sync.Once 会通过成员变量 done 确保函数不会执行第二次。

小结 #

作为用于保证函数执行次数的 sync.Once 结构体,它使用互斥锁和 sync/atomic 包提供的方法实现了某个函数在程序运行期间只能执行一次的语义。在使用该结构体时,我们也需要注意以下的问题:

sync.Once.Do 方法中传入的函数只会被执行一次,哪怕函数中发生了 panic;两次调用 sync.Once.Do 方法传入不同的函数只会执行第一次调传入的函数;Cond #

Go 语言标准库中还包含条件变量 sync.Cond,它可以让一组的 Goroutine 都在满足特定条件时被唤醒。每一个 sync.Cond 结构体在初始化时都需要传入一个互斥锁,我们可以通过下面的例子了解它的使用方法:

var status int64 func main() { c := sync.NewCond(&sync.Mutex{}) for i := 0; i < 10; i++ { go listen(c) } time.Sleep(1 * time.Second) go broadcast(c) ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt) = n && s.waiters.Len() == 0 { s.cur += n return nil } ... ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) select { case


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有