go-sync-mutex

Sync

​ Go 语言作为一个原生支持用户态进程(Goroutine)的语言,当提到并发编程、多线程编程时,往往都离不开锁这一概念。锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一片内存时不会出现竞争条件(Race condition)等问题。

通过atomic.CompareAndSwapInt32调用汇编CAS(compare and swap)指令的原子性来实现临界区的互斥访问,保证只有一个协程获取到锁

​ 当其中一个 goroutine 获得了这个锁,其他 goroutine 尝试获取这个锁时将会被阻塞,直到持有锁的 goroutine 释放锁为止。

​ Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的 sync.Mutexsync.RWMutexsync.WaitGroupsync.Oncesync.Cond

!Mutex互斥锁

​ Go 语言的 sync.Mutex 由两个字段 statesema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。

type Mutex struct {state int32sema uint32		// 指针地址 0xF,存着结构体的地址
}

Mutex.state

状态字段

int32类型的state代表:

  • locked: 锁状态 1被锁 0未被锁

  • woken:1是否有goroutine模式被唤醒,0未被唤醒

  • starving:1进入饥饿模式,0正常模式

  • 其他位:代表获取锁的等待队列中的协程数,state是int32类型,说明是32bit,其余位是32-3 bits,所以最大排队协程数就是2^(32-3)

锁模式

  • 正常模式:队头和新协程的抢占,未抢占到的扔到队尾
  • 饥饿模式:按顺序获取锁,不得插队,防止队尾一直阻塞等待
正常模式

在正常模式下获取锁:

  1. 多线程下竞争锁,获取成功返回,修改sync.Mutex结构体字段。获取失败,自旋等待其他线程释放锁,4次之后仍然拿不到锁,goroutine加入到等待队列尾部,状态改成_GWaiting
  2. 获取到锁的线程释放锁,从等待队列头部唤醒一个Goroutine,状态改成_Grunning,他会和新创建并且获取锁的新goroutine(M正在运行的g_Grunning)争抢锁。
    1. 如果被唤醒的G仍然未能抢到锁,goroutine加入到等待队列头部,状态改成_GWaiting
    2. 如果被唤醒的G抢到锁,新创建的G相当于重新进入1步骤
饥饿模式

在饥饿模式下获取锁:

互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

锁模型切换
  • 正常模式切换到饥饿模式:被唤醒的 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
  • 饥饿模式换到正常模式切:
    • 一个 Goroutine 获得了互斥锁并且它在队列的末尾,说明没有协程在竞争了,切换到正常模式
    • 被唤醒的 Goroutine 获得锁没超过 1ms ,切换到正常模式

Mutex.Sema

控制锁状态的信号量(互斥信号量)

// runtime/sema.go
type semaRoot struct {lock  mutextreap *sudog // 锁抢占者的 平衡树的根nwait uint32 // 抢占锁的goroutine的数量
}

互斥锁加锁/解锁

  • func (m *Mutex) Lock():Lock方法锁住m,如果m已经加锁,则阻塞直到m解锁。
func (m *Mutex) Lock() {// 未锁状态,获取锁returnif atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {if race.Enabled {race.Acquire(unsafe.Pointer(m))}return}// Slow path (outlined so that the fast path can be inlined)m.lockSlow()
}func (m *Mutex) lockSlow() {var waitStartTime int64	// 协程抢占锁时间,时间超出,锁变成饥饿模式starving := falseawoke := falseiter := 0old := m.statefor {// 锁住状态下 and 不是饥模式 and 在可自旋次数下 进入if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {// awoke标记是false and 锁非唤醒状态 and 锁的等待者大于0  // 满足这些条件,把锁变成唤醒状态// awoke flag标记成trueif !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}// 自旋 汇编runtime_doSpin()// 累计自选次数iter++// 把唤醒状态 覆盖 oldold = m.statecontinue}// 可能其他协程更改了锁状态:改成了`未锁住状态` // 以下操作就有AB两种情况// A情况: 锁住状态 且 饥饿模式 (自旋次数超过4次)// B情况: 未锁住//拿到最新锁状态new := old// old不是饥饿模式(排除A情况),那是B情况,把new设置成锁状态if old&mutexStarving == 0 {new |= mutexLocked}// old 是 锁住状态 或 是饥饿模式。// 等待数+1 (当前协程加入等待)if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}// 饥饿标识非空 and old是锁住状态。 (第一次进入 且 A情况)// new设置成饥饿状态if starving && old&mutexLocked != 0 {new |= mutexStarving}// awoke标识是 唤醒状态if awoke {// new不是唤醒状态,锁标识不对,panicif new&mutexWoken == 0 {throw("sync: inconsistent mutex state")}// &^ 想异的位保留,相同的位清0。 非唤醒状态 变成 唤醒, 唤醒状态下变成非唤醒new &^= mutexWoken}// 此时new的3个字段状态 : 锁住,饥饿,唤醒状态未知// 如果状态没有被其他协程改变,状态更改成newif atomic.CompareAndSwapInt32(&m.state, old, new) {// 如果状态是非锁住 and 非饥饿模式 // compareAndSwapInt32已经改成锁住,break forif old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}// 设置排队者的开始等待时间queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}// 信号量设置,阻塞等待(信号量的P操作,协程间通信)runtime_SemacquireMutex(&m.sema, queueLifo, 1)// 标记 饥饿标识, 如果是饥饿标识是true 或者 大于饥饿阈值 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs// 获取最新锁状态,虽然前面compareAndSwap已经改成了m.state : 锁住,饥饿,唤醒状态未知。但是前面阻塞有可能其他协程更改了状态old = m.state// 锁是饥饿模式if old&mutexStarving != 0 {// 锁是 锁住状态 或者 唤醒状态 或者 等待者为0个时// 抛出if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}// delta := int32(mutexLocked - 1<<mutexWaiterShift)// 非贪婪模式 或则 等待者为1时if !starving || old>>mutexWaiterShift == 1 {delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0} else {old = m.state}}if race.Enabled {race.Acquire(unsafe.Pointer(m))}
}
  • func (m *Mutex) Unlock():Unlock方法解锁m,如果m未加锁会导致运行时错误。锁和线程无关,可以由不同的线程加锁和解锁。
func (m *Mutex) Unlock() {if race.Enabled {_ = m.staterace.Release(unsafe.Pointer(m))}// Fast path: drop lock bit.new := atomic.AddInt32(&m.state, -mutexLocked)if new != 0 {// Outlined slow path to allow inlining the fast path.// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.m.unlockSlow(new)}
}func (m *Mutex) unlockSlow(new int32) {if (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")}if new&mutexStarving == 0 {old := newfor {if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}// Grab the right to wake someone.new = (old - 1<<mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 1)return}old = m.state}} else {// 信号量中的V操作runtime_Semrelease(&m.sema, true, 1)}
}

信号量:信号量有两种原子操作,他们必须成对出现
P操作:信号量 减1,当信号量 <0 ,表明资源被占用,进程阻塞。 当信号量>=0,表明资源被释放(可用),进程可继续执行
V操作:信号量加1,当信号量<=0时,代表有阻塞中进程。当信号量>0,表明没有阻塞中进程,无需操作
互斥信号量,默认值为1
————————————————
版权声明:本文为CSDN博主「我是你的小阿磊」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qiu18610714529/article/details/109062176

example

import "sync"func main() {m := sync.Mutex{}go user1(&m)go user2(&m)signalChan := make(chan os.Signal, 1)signal.Notify(signalChan, os.Interrupt)select {case <-signalChan:fmt.Println("catch interrupt signal")break}
}func printer(str string, m *sync.Mutex) {m.Lock()         //加锁defer m.Unlock() //解锁for _, ch := range str {fmt.Printf("%c", ch)time.Sleep(time.Millisecond * 1)}
}
func user1(m *sync.Mutex) {printer("hello ", m)
}
func user2(m *sync.Mutex) {printer("world", m)
}//打印结果
worldhello 或者 helloworld: 两个单词是有序的,不像`heworllldo`两个协程同时打印,说明某个协程会在mutex.Lock()进行自旋等待获取锁

RWMutex读写互斥锁

读写互斥锁 sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。

type RWMutex struct {w           Mutex  // held if there are pending writerswriterSem   uint32 // semaphore for writers to wait for completing readersreaderSem   uint32 // semaphore for readers to wait for completing writersreaderCount int32  // number of pending readersreaderWait  int32  // number of departing readers
}
  • w — 复用互斥锁提供的能力;
  • writerSemreaderSem — 分别用于写等待读和读等待写:
  • readerCount 存储了当前正在执行的读操作数量;
  • readerWait 表示当写操作被阻塞时等待的读操作个数;

加锁/解锁

  • func (rw *RWMutex) RLock() :读加锁,如果有写锁,则阻塞等待

    func (rw *RWMutex) RLock() {if race.Enabled {_ = rw.w.staterace.Disable()}if atomic.AddInt32(&rw.readerCount, 1) < 0 {// 阻塞,等待信号量的v操作释放共享内存,才能获得执行权runtime_SemacquireMutex(&rw.readerSem, false, 0)}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(&rw.readerSem))}
    }
    
  • func (rw *RWMutex) RUnlock():解读锁,

    func (rw *RWMutex) RUnlock() {if race.Enabled {_ = rw.w.staterace.ReleaseMerge(unsafe.Pointer(&rw.writerSem))race.Disable()}if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {// Outlined slow-path to allow the fast-path to be inlinedrw.rUnlockSlow(r)}if race.Enabled {race.Enable()}
    }func (rw *RWMutex) rUnlockSlow(r int32) {if r+1 == 0 || r+1 == -rwmutexMaxReaders {race.Enable()throw("sync: RUnlock of unlocked RWMutex")}// A writer is pending.if atomic.AddInt32(&rw.readerWait, -1) == 0 {// The last reader unblocks the writer.runtime_Semrelease(&rw.writerSem, false, 1)}
    }
    
  • func (rw *RWMutex) Lock(): 写锁,如果有读写锁被占用,阻塞等待所有读写锁释放后才能获得

    • 其他 Goroutine 在获取写锁时会进入自旋或者休眠
    • 有其他 Goroutine 持有互斥锁的读锁该 Goroutine 会调用 runtime.sync_runtime_SemacquireMutex 进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒;
func (rw *RWMutex) Lock() {if race.Enabled {_ = rw.w.staterace.Disable()}// First, resolve competition with other writers.rw.w.Lock()// Announce to readers there is a pending writer.r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders// Wait for active readers.if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {runtime_SemacquireMutex(&rw.writerSem, false, 0)}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(&rw.readerSem))race.Acquire(unsafe.Pointer(&rw.writerSem))}
}

example

func RMutex() {ch := make(chan struct{})rw := &sync.RWMutex{}go func() {rw.RLock()time.Sleep(time.Second * 5)defer rw.RUnlock()fmt.Println("fun1")}()go func() {time.Sleep(time.Millisecond * 500)rw.Lock()defer rw.Unlock()fmt.Println("fun2")close(ch)}()<-ch
}// 先打印出fun1 再打印fun2 代表了读写互斥

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/161267.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ELK + kafka 日志方案

概述 本文介绍使用ELK&#xff08;elasticsearch、logstash、kibana&#xff09;  kafka来搭建一个日志系统。主要演示使用spring aop进行日志收集&#xff0c;然后通过kafka将日志发送给logstash&#xff0c;logstash再将日志写入elasticsearch&#xff0c;这样elasticsearc…

【Linux】Nignx及负载均衡动静分离

一、Nginx简介 1.1 什么是nginx? Nginx&#xff08;发音同"engine x"&#xff09;是一个高性能的反向代理和 Web 服务器软件&#xff0c;最初是由俄罗斯人 Igor Sysoev 开发的。Nginx 的第一个版本发布于 2004 年&#xff0c;其源代码基于双条款 BSD 许可证发布&am…

JavaScript_Date对象_实例方法_set类

设置一年后的今天&#xff1a; <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta name"viewport" content"widthdevice-width, initial-scale1.0"> <title>Document</…

Asterisk Ubuntu 安装

更新环境 sudo apt update sudo apt install wget build-essential git autoconf subversion pkg-config libtool sudo contrib/scripts/get_mp3_source.sh A addons/mp3 A addons/mp3/common.c A addons/mp3/huffman.h A addons/mp3/tabinit.c A addons/mp3/Ma…

MySQL连接时出现Host ‘::1‘ is not allowed to connect to this MySQL server

报错原因 之前想着要提高一下连接速度&#xff0c;所以在my.ini中加入了&#xff1a;skip-name-resolve&#xff0c;当时的数据库root账号设置的登录权限是%&#xff0c;因此没有出现连接错误&#xff0c;这次因为是新建数据库&#xff0c;root账号的登录权限默认是localhost&…

非线性【SVM】的创建和使用

先来绘制散点图&#xff1a; from sklearn.datasets import make_circles X,y make_circles(100, factor0.1, noise.1) # 100个样本&#xff0c;factor:内圈和外圈的距离之比&#xff0c;noise:噪声 X.shape y.shape plt.scatter(X[:,0],X[:,1],cy,s50,cmap"rainbow&qu…

在IDEA运行spark程序(搭建Spark开发环境)

建议大家写在Linux上搭建好Hadoop的完全分布式集群环境和Spark集群环境&#xff0c;以下在IDEA中搭建的环境仅仅是在window系统上进行spark程序的开发学习&#xff0c;在window系统上可以不用安装hadoop和spark&#xff0c;spark程序可以通过pom.xml的文件配置&#xff0c;添加…

【C/C++笔试练习】new和deleted底层原理、静态数据成员、运算符重载、只能使用new创建的类、模版声明、另类加法、走方格的方案数

文章目录 C/C笔试练习选择部分&#xff08;1&#xff09;new和deleted底层原理&#xff08;2&#xff09;静态数据成员&#xff08;3&#xff09;运算符重载&#xff08;4&#xff09;程序分析&#xff08;5&#xff09;静态数据成员&#xff08;6&#xff09;只能使用new创建的…

基于java+springboot+vue城市轨道交通线路查询系统-公交车线路查询

项目介绍 本系统是针对目前交通管理的实际需求&#xff0c;从实际工作出发&#xff0c;对过去的市轨道交通线路查询系统存在的问题进行分析&#xff0c;完善用户的使用体会。采用计算机系统来管理信息&#xff0c;取代人工管理模式&#xff0c;查询便利&#xff0c;信息准确率…

视频列表:点击某个视频进行播放,其余视频全部暂停(同时只播放一个视频)

目录 需求实现原理实现代码页面展示 需求 视频列表&#xff1a;点击某个视频进行播放&#xff0c;其余视频全部暂停&#xff08;同时只播放一个视频&#xff09; 实现原理 在 video 标签添加 自定义属性 id (必须唯一)给每个 video 标签 添加 play 视频播放事件播放视频时&…

【LeetCode力扣】287.寻找重复数

1、题目介绍 原题链接&#xff1a;287. 寻找重复数 - 力扣&#xff08;LeetCode&#xff09; 示例 1&#xff1a; 输入&#xff1a;nums [1,3,4,2,2] 输出&#xff1a;2示例 2&#xff1a; 输入&#xff1a;nums [3,1,3,4,2] 输出&#xff1a;3提示&#xff1a; 1 < n &l…

Hadoop环境搭建及Demo

参考博客 Windows 10安装Hadoop 3.3.0教程 (kontext.tech) Hadoop入门篇——伪分布模式安装 & WordCount词频统计 | Liu Baoshuai’s Blog Hadoop安装教程 Linux版_linux和hadoop的安装_lnlnldczxy的博客-CSDN博客 hadoop启动出错 The value of property bind.address …