<kbd id="5sdj3"></kbd>
<th id="5sdj3"></th>

  • <dd id="5sdj3"><form id="5sdj3"></form></dd>
    <td id="5sdj3"><form id="5sdj3"><big id="5sdj3"></big></form></td><del id="5sdj3"></del>

  • <dd id="5sdj3"></dd>
    <dfn id="5sdj3"></dfn>
  • <th id="5sdj3"></th>
    <tfoot id="5sdj3"><menuitem id="5sdj3"></menuitem></tfoot>

  • <td id="5sdj3"><form id="5sdj3"><menu id="5sdj3"></menu></form></td>
  • <kbd id="5sdj3"><form id="5sdj3"></form></kbd>

    你真的了解 sync.Mutex嗎

    共 6846字,需瀏覽 14分鐘

     ·

    2021-02-20 15:11

    Mutex是一個互斥的排他鎖,零值Mutex為未上鎖狀態(tài),Mutex一旦被使用 禁止被拷貝。使用起來也比較簡單

    package?main

    import?"sync"

    func?main()?{
    ?m?:=?sync.Mutex{}
    ?m.Lock()
    ?defer?m.Unlock()
    ??//?do?something
    }

    Mutex有兩種操作模式:

    • 正常模式(非公平模式)

    阻塞等待的goroutine保存在FIFO的隊列中,喚醒的goroutine不直接擁有鎖,需要與新來的goroutine競爭獲取鎖。因為新來的goroutine很多已經(jīng)占有了CPU,所以喚醒的goroutine在競爭中很容易輸;但如果一個goroutine獲取鎖失敗超過1ms,則會將Mutex切換為饑餓模式。

    • 饑餓模式(公平模式)

    這種模式下,直接將等待隊列隊頭goroutine解鎖goroutine;新來的gorountine也不會嘗試獲得鎖,而是直接插入到等待隊列隊尾。

    mutex mode

    如果一個goroutine獲得了鎖,并且他在等待隊列隊尾 或者 他等待小于1ms,則會將Mutex的模式切換回正常模式。正常模式有更好的性能,新來的goroutine通過幾次競爭可以直接獲取到鎖,盡管當(dāng)前仍有等待的goroutine。而饑餓模式則是對正常模式的補充,防止等待隊列中的goroutine永遠沒有機會獲取鎖。

    其數(shù)據(jù)結(jié)構(gòu)為:

    type?Mutex?struct?{
    ?state?int32?//?鎖競爭的狀態(tài)值
    ?sema??uint32?//?信號量
    }

    state代表了當(dāng)前鎖的狀態(tài)、 是否是存在自旋、是否是饑餓模式、阻塞goroutine數(shù)量

    ?mutexLocked?=?1?<iota?//?mutex?is?locked
    ?mutexWoken
    ?mutexStarving
    ?mutexWaiterShift?=?iota

    mutex state

    mutex.state & mutexLocked 加鎖狀態(tài) 1 表示已加鎖 0 表示未加鎖

    mutex.state & mutexWoken ?喚醒狀態(tài) 1 表示已喚醒狀態(tài) 0 表示未喚醒

    mutex.state & mutexStarving ?饑餓狀態(tài) ?1 表示饑餓狀態(tài)?0表示正常狀態(tài)

    mutex.state >> mutexWaiterShift得到當(dāng)前goroutine數(shù)目

    Lock

    上鎖大致分為fast-pathslow-path

    Fast-path

    lock通過調(diào)用atomic.CompareAndSwapInt32來競爭更新m.state,成功則獲得鎖;失敗,則進入slow-path

    func?(m?*Mutex)?Lock()?{
    ?//?Fast?path:?grab?unlocked?mutex.
    ?if?atomic.CompareAndSwapInt32(&m.state,?0,?mutexLocked)?{
    ??if?race.Enabled?{
    ???race.Acquire(unsafe.Pointer(m))
    ??}
    ??return
    ?}
    ?//?Slow?path?(outlined?so?that?the?fast?path?can?be?inlined)
    ?m.lockSlow()
    }

    atomic.CompareAndSwapInt32正如簽名一樣,進行比較交換操作,這過程是原子的

    //?CompareAndSwapInt32?executes?the?compare-and-swap?operation?for?an?int32?value.
    func?CompareAndSwapInt32(addr?*int32,?old,?new?int32)?(swapped?bool)

    源碼中我們并不能看到該函數(shù)的具體實現(xiàn),他的實現(xiàn)跟硬件平臺有關(guān),我們可以查看匯編代碼一窺究竟,go tool compile -S mutex.go也可以對二進制文件go tool objdump -s methodname binary

    	0x0036 00054 (loop.go:6)	MOVQ	AX, CX
    0x0039 00057 ($GOROOT/src/sync/mutex.go:74) XORL AX, AX
    0x003b 00059 ($GOROOT/src/sync/mutex.go:74) MOVL $1, DX
    0x0040 00064 ($GOROOT/src/sync/mutex.go:74) LOCK
    0x0041 00065 ($GOROOT/src/sync/mutex.go:74) CMPXCHGL DX, (CX)
    0x0044 00068 ($GOROOT/src/sync/mutex.go:74) SETEQ AL
    0x0047 00071 ($GOROOT/src/sync/mutex.go:74) TESTB AL, AL
    0x0049 00073 ($GOROOT/src/sync/mutex.go:74) JEQ 150
    0x004b 00075 (loop.go:8) MOVL $8, ""..autotmp_6+16(SP)
    0x0053 00083 (loop.go:8) LEAQ sync.(*Mutex).Unlock·f(SB), AX

    重點關(guān)注第5行CMPXCHGL DX, (CX)這個CMPXCHGL是x86和Intel架構(gòu)中的compare and exchange指令,Java的那套AtomicXX底層也是依賴這個指令來保證原子性操作的。

    所以我們看到Mutex是互斥排他鎖且不可重入,當(dāng)我們在一個goroutine獲取同一個鎖會導(dǎo)致死鎖。

    package?main

    import?"sync"

    func?main()?{
    ?m?:=?sync.Mutex{}
    ?m.Lock()
    ??//這里會導(dǎo)致死鎖
    ?m.Lock()
    ?defer?m.Unlock()
    }

    slow-path

    如果goroutinefast-path失敗,則調(diào)用m.lockSlow()進入slow-path,函數(shù)內(nèi)部主要是一個for{}死循環(huán),進入循環(huán)的goroutine大致分為兩類:

    • 新來的gorountine
    • 被喚醒的goroutine

    Mutex默認為正常模式,若新來的goroutine搶占成功,則另一個就需要阻塞等待;阻塞等待一旦超過閾值1ms則會將Mutex切換到饑餓模式,這個模式下新來的goroutine只能阻塞等待在隊列尾部,沒有搶占的資格。當(dāng)然等待阻塞->喚醒->參與搶占鎖,這個過程顯示不是很高效,所以這里有一個自旋的優(yōu)化

    當(dāng)mutex處于正常模式且能夠自旋,會讓當(dāng)前goroutine自旋等待,同時設(shè)置mutex.state的mutexWoken位為1,保證自旋等待的goroutine一定比新來goroutine更有優(yōu)先權(quán)。這樣unlock操作也會優(yōu)先保證自旋等待的goroutine獲取鎖

    golang對自旋做了些限制要求 需要:

    • 多核CPU
    • GOMAXPROCS>1
    • 至少有一個運行的P并且local的P隊列為空

    感興趣的可以跟下源碼比較簡單

    func?(m?*Mutex)?lockSlow()?{
    ?var?waitStartTime?int64
    ?starving?:=?false
    ?awoke?:=?false
    ?iter?:=?0
    ?old?:=?m.state
    ?for?{
    ????//饑餓模式下不能自旋,也沒有資格搶占,鎖是手遞手給到等待的goroutine
    ??if?old&(mutexLocked|mutexStarving)?==?mutexLocked?&&?runtime_canSpin(iter)?{//當(dāng)Mutex處于正常模式且能夠自旋
    ??????//設(shè)置mutexWoken為1?告訴unlock操作,存在自旋gorountine?unlock后不需要喚醒其他goroutine
    ???if?!awoke?&&?old&mutexWoken?==?0?&&?old>>mutexWaiterShift?!=?0?&&
    ????atomic.CompareAndSwapInt32(&m.state,?old,?old|mutexWoken)?{
    ????awoke?=?true
    ???}
    ???runtime_doSpin()
    ???iter++
    ???old?=?m.state
    ???continue
    ??}
    ??//??自旋完了?還是沒拿到鎖
    ??new?:=?old
    ????//當(dāng)mutex處于正常模式,將new的mutexLocked設(shè)置為1?即準(zhǔn)備搶占鎖
    ??if?old&mutexStarving?==?0?{
    ???new?|=?mutexLocked
    ??}
    ????//加鎖狀態(tài)或饑餓模式下?新來的goroutine進入等待隊列
    ??if?old&(mutexLocked|mutexStarving)?!=?0?{
    ???new?+=?1?<??}

    ????//將Mutex切換為饑餓模式,若未加鎖則不必切換
    ????//Unlock操作希望饑餓模式存在等待者
    ??if?starving?&&?old&mutexLocked?!=?0?{
    ???new?|=?mutexStarving
    ??}
    ??if?awoke?{
    ??????//?當(dāng)前goroutine自旋過?已被被喚醒,則需要將mutexWoken重置
    ???if?new&mutexWoken?==?0?{
    ????throw("sync:?inconsistent?mutex?state")
    ???}
    ???new?&^=?mutexWoken?//重置mutexWoken
    ??}
    ??if?atomic.CompareAndSwapInt32(&m.state,?old,?new)?{
    ??????//?當(dāng)前goroutine獲取鎖前mutex處于未加鎖?正常模式下
    ???if?old&(mutexLocked|mutexStarving)?==?0?{
    ????break?//?使用CAS成功搶占到鎖
    ???}
    ???//?waitStartTime!=0表示當(dāng)前goroutine是等待狀態(tài)喚醒的?
    ??????//?為了與第一次調(diào)用Lock的goroutine劃分不同的優(yōu)先級
    ???queueLifo?:=?waitStartTime?!=?0
    ???if?waitStartTime?==?0?{
    ????????//開始記錄等待時間
    ????waitStartTime?=?runtime_nanotime()
    ???}
    ??????//?將被喚醒但是沒有獲得鎖的goroutine插入到當(dāng)前等待隊列隊首
    ??????//?使用信號量阻塞當(dāng)前goroutine
    ???runtime_SemacquireMutex(&m.sema,?queueLifo,?1)
    ??????//?當(dāng)goroutine等待時間超過starvationThresholdNs,mutex進入饑餓模式
    ???starving?=?starving?||?runtime_nanotime()-waitStartTime?>?starvationThresholdNs
    ???old?=?m.state
    ???if?old&mutexStarving?!=?0?{
    ????????//如果當(dāng)前goroutine被喚醒且mutex處于饑餓模式?則將鎖手遞手交給當(dāng)前goroutine
    ????if?old&(mutexLocked|mutexWoken)?!=?0?||?old>>mutexWaiterShift?==?0?{
    ?????throw("sync:?inconsistent?mutex?state")
    ????}
    ????????//等待狀態(tài)的goroutine?-?1
    ????delta?:=?int32(mutexLocked?-?1<????????//如果等待時間小于1ms?或?當(dāng)前goroutine是隊列中最后一個
    ????if?!starving?||?old>>mutexWaiterShift?==?1?{
    ??????//?退出饑餓模式
    ?????delta?-=?mutexStarving
    ????}
    ????atomic.AddInt32(&m.state,?delta)
    ????break
    ???}
    ???awoke?=?true
    ???iter?=?0
    ??}?else?{
    ???old?=?m.state
    ??}
    ?}
    }

    Unlock

    解鎖分兩種情況

    1. 當(dāng)前只有一個goroutine占有鎖 unlock完 直接結(jié)束
    func?(m?*Mutex)?Unlock()?{

    ?//?去除加鎖狀態(tài)
    ?new?:=?atomic.AddInt32(&m.state,?-mutexLocked)
    ?if?new?!=?0?{//存在等待的goroutine
    ??m.unlockSlow(new)
    ?}
    }
    1. unlock完畢mutex.state!=0 則存在以下可能
      • 直接將鎖交給等待隊列的第一個goroutine
      • 當(dāng)前存在等待goroutine 然后喚醒它 但不是第一個goroutine
      • 當(dāng)前存在自旋等待的goroutine 則不喚醒其他等待gorotune
      • 正常模式下
      • 饑餓模式下
    func?(m?*Mutex)?unlockSlow(new?int32)?{
    ??//未加鎖的情況下不能多次調(diào)用unlock
    ?if?(new+mutexLocked)&mutexLocked?==?0?{
    ??throw("sync:?unlock?of?unlocked?mutex")
    ?}
    ?if?new&mutexStarving?==?0?{//正常模式下
    ??old?:=?new
    ??for?{
    ??????//沒有等待的goroutine?或?已經(jīng)存在一個獲得鎖?或被喚醒?或處于饑餓模式下不需要喚醒任何處于等待的goroutine
    ???if?old>>mutexWaiterShift?==?0?||?old&(mutexLocked|mutexWoken|mutexStarving)?!=?0?{
    ????return
    ???}
    ???//?等待狀態(tài)goroutine數(shù)量-1?并設(shè)置喚醒狀態(tài)為1?然后喚醒一個等待goroutine
    ???new?=?(old?-?1<???if?atomic.CompareAndSwapInt32(&m.state,?old,?new)?{
    ????????//喚醒一個阻塞的goroutine?但不是第一個等待者
    ????runtime_Semrelease(&m.sema,?false,?1)
    ????return
    ???}
    ???old?=?m.state
    ??}
    ?}?else?{
    ????//饑餓模式下手遞手將鎖交給隊列第一個等待的goroutine
    ????//即使期間有新來的goroutine到來,只要處于饑餓模式?鎖就不會被新來的goroutine搶占
    ??runtime_Semrelease(&m.sema,?true,?1)
    ?}
    }

    信號量

    上面可以看到Mutexgoroutine的阻塞和喚醒操作是利用semaphore來實現(xiàn)的,大致的思路是:Go runtime維護了一個全局的變量semtable,它保持了所有的信號量

    //?Prime?to?not?correlate?with?any?user?patterns.
    const?semTabSize?=?251

    var?semtable?[semTabSize]struct?{
    ?root?semaRoot
    ?pad??[cpu.CacheLinePadSize?-?unsafe.Sizeof(semaRoot{})]byte
    }

    每個信號量都由一個變量地址指定,Mutex的栗子里就是mutex.sema的地址

    type?semaRoot?struct?{
    ?lock??mutex
    ?treap?*sudog?//?root?of?balanced?tree?of?unique?waiters.
    ?nwait?uint32?//?Number?of?waiters.?Read?w/o?the?lock.
    }

    大致畫了下其數(shù)據(jù)結(jié)構(gòu)

    semtable
    1. 當(dāng)goroutine未獲取到鎖,需要阻塞時調(diào)用sync.runtime_SemacquireMutex 進入阻塞邏輯
    //go:linkname?sync_runtime_SemacquireMutex?sync.runtime_SemacquireMutex
    func?sync_runtime_SemacquireMutex(addr?*uint32,?lifo?bool,?skipframes?int)?{
    ?semacquire1(addr,?lifo,?semaBlockProfile|semaMutexProfile,?skipframes)
    }

    func?semacquire1(addr?*uint32,?lifo?bool,?profile?semaProfileFlags,?skipframes?int)?{
    ?gp?:=?getg()
    ?if?gp?!=?gp.m.curg?{
    ??throw("semacquire?not?on?the?G?stack")
    ?}

    ?//?低成本case
    ??//?若addr大于1?并通過CAS?-1?成功,則獲取信號量成功?不需要阻塞
    ?if?cansemacquire(addr)?{
    ??return
    ?}

    ?//?復(fù)雜?case:
    ?//?增加等待goroutine數(shù)量
    ?//?再次嘗試cansemacquire?成功則返回
    ?//?失敗則將自己作為一個waiter入隊
    ?//?sleep
    ?//?(waiter?descriptor?is?dequeued?by?signaler)
    ?s?:=?acquireSudog()
    ?root?:=?semroot(addr)
    ?t0?:=?int64(0)
    ?s.releasetime?=?0
    ?s.acquiretime?=?0
    ?s.ticket?=?0
    ?if?profile&semaBlockProfile?!=?0?&&?blockprofilerate?>?0?{
    ??t0?=?cputicks()
    ??s.releasetime?=?-1
    ?}
    ?if?profile&semaMutexProfile?!=?0?&&?mutexprofilerate?>?0?{
    ??if?t0?==?0?{
    ???t0?=?cputicks()
    ??}
    ??s.acquiretime?=?t0
    ?}
    ?for?{
    ??lock(&root.lock)
    ??//?給nwait+1?這樣semrelease中不會進低成本路徑了
    ??atomic.Xadd(&root.nwait,?1)
    ??//?檢查?cansemacquire?避免錯過喚醒
    ??if?cansemacquire(addr)?{
    ???atomic.Xadd(&root.nwait,?-1)
    ???unlock(&root.lock)
    ???break
    ??}
    ????//cansemacquire之后的semrelease都可以知道我們正在等待
    ????//上面設(shè)置了nwait,所以會直接進入sleep?即goparkunlock
    ??root.queue(addr,?s,?lifo)
    ??goparkunlock(&root.lock,?waitReasonSemacquire,?traceEvGoBlockSync,?4+skipframes)
    ??if?s.ticket?!=?0?||?cansemacquire(addr)?{
    ???break
    ??}
    ?}
    ?if?s.releasetime?>?0?{
    ??blockevent(s.releasetime-t0,?3+skipframes)
    ?}
    ?releaseSudog(s)
    }

    如果addr大于1并通過CAS-1成功則獲取信號量成功,直接返回

    否則通過對信號量地址偏移取模&semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root拿到semaRoot(這里個3和251 沒有明白為什么是這兩個數(shù)???),semaRoot包含了一個sudog鏈表和一個nwait整型字段。nwait表示該信號量上阻塞等待的g的數(shù)量,同時為了保證線程安全需要一個互斥量來保護鏈表。

    這里需要注意的是 此處的runtime.mutex并不是之前所說的sync.Mutex,是內(nèi)部的一個簡單版本

    簡單來說,sync_runtime_Semacquire就是wait知道*s>0 然后原子的遞減它,來完成同步過程中簡單的睡眠原語

    1. 當(dāng)goroutine要釋放鎖 喚醒等待的g時調(diào)用sync.runtime_Semrelease
    //go:linkname?sync_runtime_Semrelease?sync.runtime_Semrelease
    func?sync_runtime_Semrelease(addr?*uint32,?handoff?bool,?skipframes?int)?{
    ?semrelease1(addr,?handoff,?skipframes)
    }

    func?semrelease1(addr?*uint32,?handoff?bool,?skipframes?int)?{
    ?root?:=?semroot(addr)
    ?atomic.Xadd(addr,?1)

    ?//?Easy?case:?no?waiters?
    ?//?這個檢查必須發(fā)生在xadd之后?避免錯過喚醒
    ?//?(see?loop?in?semacquire).
    ?if?atomic.Load(&root.nwait)?==?0?{
    ??return
    ?}

    ?//?Harder?case:?搜索一個waiter?并喚醒它
    ?lock(&root.lock)
    ?if?atomic.Load(&root.nwait)?==?0?{
    ??//?count值已經(jīng)被另一個goroutine消費了
    ??//?所以不需要喚醒其他goroutine
    ??unlock(&root.lock)
    ??return
    ?}
    ?s,?t0?:=?root.dequeue(addr)
    ?if?s?!=?nil?{
    ??atomic.Xadd(&root.nwait,?-1)
    ?}
    ?unlock(&root.lock)
    ?if?s?!=?nil?{?//?May?be?slow,?so?unlock?first
    ??acquiretime?:=?s.acquiretime
    ??if?acquiretime?!=?0?{
    ???mutexevent(t0-acquiretime,?3+skipframes)
    ??}
    ??if?s.ticket?!=?0?{
    ???throw("corrupted?semaphore?ticket")
    ??}
    ??if?handoff?&&?cansemacquire(addr)?{
    ???s.ticket?=?1
    ??}
    ??readyWithTime(s,?5+skipframes)
    ?}
    }

    關(guān)于信號量更深層的研究可以看下semaphore in plan9

    總結(jié)

    通過看源碼發(fā)現(xiàn)個有意思的問題:如果goroutine g1加的鎖 可以被另一個goroutine g2解鎖,但是等到g1解鎖的時候就會panic



    推薦閱讀


    福利

    我為大家整理了一份從入門到進階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進階看什么。關(guān)注公眾號 「polarisxu」,回復(fù)?ebook?獲取;還可以回復(fù)「進群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

    瀏覽 104
    點贊
    評論
    收藏
    分享

    手機掃一掃分享

    分享
    舉報
    評論
    圖片
    表情
    推薦
    點贊
    評論
    收藏
    分享

    手機掃一掃分享

    分享
    舉報

    <kbd id="5sdj3"></kbd>
    <th id="5sdj3"></th>

  • <dd id="5sdj3"><form id="5sdj3"></form></dd>
    <td id="5sdj3"><form id="5sdj3"><big id="5sdj3"></big></form></td><del id="5sdj3"></del>

  • <dd id="5sdj3"></dd>
    <dfn id="5sdj3"></dfn>
  • <th id="5sdj3"></th>
    <tfoot id="5sdj3"><menuitem id="5sdj3"></menuitem></tfoot>

  • <td id="5sdj3"><form id="5sdj3"><menu id="5sdj3"></menu></form></td>
  • <kbd id="5sdj3"><form id="5sdj3"></form></kbd>
    麻豆AV三级片 | 中文字幕32页 | 99精品视频国产 | 美腿丝袜亚洲综合 | 亚洲黄色在线看 |