分布式事務(wù)、分布式鎖、分布式session
點擊上方藍色“程序猿DD”,選擇“設(shè)為星標(biāo)”
回復(fù)“資源”獲取獨家整理的學(xué)習(xí)資料!

一、分布式session
session 是啥?瀏覽器有個 cookie,在一段時間內(nèi)這個 cookie 都存在,然后每次發(fā)請求過來都帶上一個特殊的?jsessionid cookie,就根據(jù)這個東西,在服務(wù)端可以維護一個對應(yīng)的 session 域,里面可以放點數(shù)據(jù)。
一般的話只要你沒關(guān)掉瀏覽器,cookie 還在,那么對應(yīng)的那個 session 就在,但是如果 cookie 沒了,session 也就沒了。常見于什么購物車之類的東西,還有登錄狀態(tài)保存之類的。
這個不多說了,懂 Java 的都該知道這個。
單塊系統(tǒng)的時候這么玩兒 session 沒問題,但是你要是分布式系統(tǒng)呢,那么多的服務(wù),session 狀態(tài)在哪兒維護???
(1)完全不用 session
使用 JWT Token 儲存用戶身份,然后再從數(shù)據(jù)庫或者 cache 中獲取其他的信息。這樣無論請求分配到哪個服務(wù)器都無所謂
(2)tomcat + redis
這個其實還挺方便的,就是使用 session 的代碼,跟以前一樣,還是基于 tomcat 原生的 session 支持即可,然后就是用一個叫做?Tomcat RedisSessionManager?的東西,讓所有我們部署的 tomcat 都將 session 數(shù)據(jù)存儲到 redis 即可。
在 tomcat 的配置文件中配置:
<Valve?className="com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve"?/>
<Manager?className="com.orangefunction.tomcat.redissessions.RedisSessionManager"
?????????host="{redis.host}"
?????????port="{redis.port}"
?????????database="{redis.dbnum}"
?????????maxInactiveInterval="60"/>然后指定 redis 的 host 和 port 就 ok 了。
<Valve?className="com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve"?/>
<Manager?className="com.orangefunction.tomcat.redissessions.RedisSessionManager"
?????????sentinelMaster="mymaster"
?????????sentinels=":26379,:26379,:26379"
?????????maxInactiveInterval="60"/>還可以用上面這種方式基于 redis 哨兵支持的 redis 高可用集群來保存 session 數(shù)據(jù),都是 ok 的。
(3)spring?session + redis
上面所說的第二種方式會與 tomcat 容器重耦合,如果我要將 web 容器遷移成 jetty,難道還要重新把 jetty 都配置一遍?
因為上面那種 tomcat + redis 的方式好用,但是會嚴(yán)重依賴于web容器,不好將代碼移植到其他 web 容器上去,尤其是你要是換了技術(shù)棧咋整?比如換成了 spring cloud 或者是 spring boot 之類的呢?
所以現(xiàn)在比較好的還是基于 Java 一站式解決方案,也就是 spring。人家 spring 基本上承包了大部分我們需要使用的框架,spirng cloud 做微服務(wù),spring boot 做腳手架,所以用 sping session 是一個很好的選擇。
在 pom.xml 中配置:
<dependency>
????<groupId>org.springframework.sessiongroupId>
????<artifactId>spring-session-data-redisartifactId>
????<version>1.2.1.RELEASEversion>
dependency>
<dependency>
??????<groupId>redis.clientsgroupId>
??????<artifactId>jedisartifactId>
??????<version>2.8.1version>
dependency>在 spring 配置文件中配置:
<bean?id="redisHttpSessionConfiguration"
class="org.springframework.session.data.redis.config.annotation.web.http.RedisHttpSessionConfiguration">
????<property?name="maxInactiveIntervalInSeconds"?value="600"/>
bean>
<bean?id="jedisPoolConfig"?class="redis.clients.jedis.JedisPoolConfig">
????<property?name="maxTotal"?value="100"?/>
????<property?name="maxIdle"?value="10"?/>
bean>
<bean?id="jedisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"?destroy-method="destroy">
????<property?name="hostName"?value="${redis_hostname}"/>
????<property?name="port"?value="${redis_port}"/>
????<property?name="password"?value="${redis_pwd}"?/>
????<property?name="timeout"?value="3000"/>
????<property?name="usePool"?value="true"/>
????<property?name="poolConfig"?ref="jedisPoolConfig"/>
bean>在 web.xml 中配置:
<filter>
??<filter-name>springSessionRepositoryFilterfilter-name>
??<filter-class>org.springframework.web.filter.DelegatingFilterProxyfilter-class>
filter>
<filter-mapping>
??<filter-name>springSessionRepositoryFilterfilter-name>
??<url-pattern>/*url-pattern>
filter-mapping>示例代碼:
@RestController
@RequestMapping("/test")
public?class?TestController?{
??@RequestMapping("/putIntoSession")
??public?String?putIntoSession(HttpServletRequest?request,?String?username)?{
????request.getSession().setAttribute("name",??"leo");
????return?"ok";
??}
????
@RequestMapping("/getFromSession")
??public?String?getFromSession(HttpServletRequest?request,?Model?model){
????String?name?=?request.getSession().getAttribute("name");
????return?name;
??}
??
}
上面的代碼就是 ok 的,給 sping session 配置基于 redis 來存儲 session 數(shù)據(jù),然后配置了一個 spring session 的過濾器,這樣的話,session 相關(guān)操作都會交給 spring session 來管了。接著在代碼中,就用原生的 session 操作,就是直接基于 spring sesion 從 redis 中獲取數(shù)據(jù)了。
實現(xiàn)分布式的會話有很多種方式,我說的只不過是比較常見的幾種方式,tomcat + redis 早期比較常用,但是會重耦合到 tomcat 中;近些年,通過 spring session 來實現(xiàn)。
二、分布式事務(wù)
當(dāng)我們的單個數(shù)據(jù)庫的性能產(chǎn)生瓶頸的時候,我們可能會對數(shù)據(jù)庫進行分區(qū),這里所說的分區(qū)指的是物理分區(qū),分區(qū)之后可能不同的庫就處于不同的服務(wù)器上了,這個時候單個數(shù)據(jù)庫的ACID已經(jīng)不能適應(yīng)這種情況了,而在這種ACID的集群環(huán)境下,再想保證集群的ACID幾乎是很難達到,或者即使能達到那么效率和性能會大幅下降,最為關(guān)鍵的是再很難擴展新的分區(qū)了,這個時候如果再追求集群的ACID會導(dǎo)致我們的系統(tǒng)變得很差,這時我們就需要引入一個新的理論原則來適應(yīng)這種集群的情況,就是 CAP 原則或者叫CAP定理,那么CAP定理指的是什么呢?
CAP定理
CAP定理是由加州大學(xué)伯克利分校Eric Brewer教授提出來的,他指出WEB服務(wù)無法同時滿足一下3個屬性:
一致性(Consistency) :客戶端知道一系列的操作都會同時發(fā)生(生效)
可用性(Availability) :每個操作都必須以可預(yù)期的響應(yīng)結(jié)束
分區(qū)容錯性(Partition tolerance) :即使出現(xiàn)單個組件無法可用,操作依然可以完成
具體地講在分布式系統(tǒng)中,在任何數(shù)據(jù)庫設(shè)計中,一個Web應(yīng)用至多只能同時支持上面的兩個屬性。顯然,任何橫向擴展策略都要依賴于數(shù)據(jù)分區(qū)。因此,設(shè)計人員必須在一致性與可用性之間做出選擇。
這個定理在迄今為止的分布式系統(tǒng)中都是適用的!?為什么這么說呢?
這個時候有同學(xué)可能會把數(shù)據(jù)庫的2PC(兩階段提交)搬出來說話了。OK,我們就來看一下數(shù)據(jù)庫的兩階段提交。
對數(shù)據(jù)庫分布式事務(wù)有了解的同學(xué)一定知道數(shù)據(jù)庫支持的2PC,又叫做 XA Transactions。
其中,XA 是一個兩階段提交協(xié)議,該協(xié)議分為以下兩個階段:
第一階段:事務(wù)協(xié)調(diào)器要求每個涉及到事務(wù)的數(shù)據(jù)庫預(yù)提交(precommit)此操作,并反映是否可以提交.
第二階段:事務(wù)協(xié)調(diào)器要求每個數(shù)據(jù)庫提交數(shù)據(jù)。
其中,如果有任何一個數(shù)據(jù)庫否決此次提交,那么所有數(shù)據(jù)庫都會被要求回滾它們在此事務(wù)中的那部分信息。這樣做的缺陷是什么呢? 咋看之下我們可以在數(shù)據(jù)庫分區(qū)之間獲得一致性。
如果CAP 定理是對的,那么它一定會影響到可用性。
如果說系統(tǒng)的可用性代表的是執(zhí)行某項操作相關(guān)所有組件的可用性的和。那么在兩階段提交的過程中,可用性就代表了涉及到的每一個數(shù)據(jù)庫中可用性的和。我們假設(shè)兩階段提交的過程中每一個數(shù)據(jù)庫都具有99.9%的可用性,那么如果兩階段提交涉及到兩個數(shù)據(jù)庫,這個結(jié)果就是99.8%。根據(jù)系統(tǒng)可用性計算公式,假設(shè)每個月43200分鐘,99.9%的可用性就是43157分鐘, 99.8%的可用性就是43114分鐘,相當(dāng)于每個月的宕機時間增加了43分鐘。
以上,可以驗證出來,CAP定理從理論上來講是正確的,CAP我們先看到這里,等會再接著說。
在分布式系統(tǒng)中,要實現(xiàn)分布式事務(wù),無外乎那幾種解決方案。
分布式事務(wù)的實現(xiàn)主要有以下 5 種方案:
XA 方案
TCC 方案
本地消息表
可靠消息最終一致性方案
最大努力通知方案
兩階段提交方案/XA方案
所謂的 XA 方案,即:兩階段提交,有一個事務(wù)管理器的概念,負(fù)責(zé)協(xié)調(diào)多個數(shù)據(jù)庫(資源管理器)的事務(wù),事務(wù)管理器先問問各個數(shù)據(jù)庫你準(zhǔn)備好了嗎?如果每個數(shù)據(jù)庫都回復(fù) ok,那么就正式提交事務(wù),在各個數(shù)據(jù)庫上執(zhí)行操作;如果任何其中一個數(shù)據(jù)庫回答不 ok,那么就回滾事務(wù)。
這種分布式事務(wù)方案,比較適合單塊應(yīng)用里,跨多個庫的分布式事務(wù),而且因為嚴(yán)重依賴于數(shù)據(jù)庫層面來搞定復(fù)雜的事務(wù),效率很低,絕對不適合高并發(fā)的場景。如果要玩兒,那么基于?Spring + JTA?就可以搞定,自己隨便搜個 demo 看看就知道了。
這個方案,我們很少用,一般來說某個系統(tǒng)內(nèi)部如果出現(xiàn)跨多個庫的這么一個操作,是不合規(guī)的。我可以給大家介紹一下, 現(xiàn)在微服務(wù),一個大的系統(tǒng)分成幾十個甚至幾百個服務(wù)。一般來說,我們的規(guī)定和規(guī)范,是要求每個服務(wù)只能操作自己對應(yīng)的一個數(shù)據(jù)庫。
如果你要操作別的服務(wù)對應(yīng)的庫,不允許直連別的服務(wù)的庫,違反微服務(wù)架構(gòu)的規(guī)范,你隨便交叉胡亂訪問,幾百個服務(wù)的話,全體亂套,這樣的一套服務(wù)是沒法管理的,沒法治理的,可能會出現(xiàn)數(shù)據(jù)被別人改錯,自己的庫被別人寫掛等情況。
如果你要操作別人的服務(wù)的庫,你必須是通過調(diào)用別的服務(wù)的接口來實現(xiàn),絕對不允許交叉訪問別人的數(shù)據(jù)庫。
TCC 方案
TCC 的全稱是:Try、Confirm、Cancel。
Try 階段:這個階段說的是對各個服務(wù)的資源做檢測以及對資源進行鎖定或者預(yù)留。
Confirm 階段:這個階段說的是在各個服務(wù)中執(zhí)行實際的操作。
Cancel 階段:如果任何一個服務(wù)的業(yè)務(wù)方法執(zhí)行出錯,那么這里就需要進行補償,就是執(zhí)行已經(jīng)執(zhí)行成功的業(yè)務(wù)邏輯的回滾操作。(把那些執(zhí)行成功的回滾)
這種方案說實話幾乎很少人使用,我們用的也比較少,但是也有使用的場景。因為這個事務(wù)回滾實際上是嚴(yán)重依賴于你自己寫代碼來回滾和補償了,會造成補償代碼巨大,非常之惡心。
比如說我們,一般來說跟錢相關(guān)的,跟錢打交道的,支付、交易相關(guān)的場景,我們會用 TCC,嚴(yán)格保證分布式事務(wù)要么全部成功,要么全部自動回滾,嚴(yán)格保證資金的正確性,保證在資金上不會出現(xiàn)問題。
而且最好是你的各個業(yè)務(wù)執(zhí)行的時間都比較短。
但是說實話,一般盡量別這么搞,自己手寫回滾邏輯,或者是補償邏輯,實在太惡心了,那個業(yè)務(wù)代碼是很難維護的。

本地消息表
本地消息表其實是國外的 ebay 搞出來的這么一套思想。
這個大概意思是這樣的:
A 系統(tǒng)在自己本地一個事務(wù)里操作同時,插入一條數(shù)據(jù)到消息表;
接著 A 系統(tǒng)將這個消息發(fā)送到 MQ 中去;
B 系統(tǒng)接收到消息之后,在一個事務(wù)里,往自己本地消息表里插入一條數(shù)據(jù),同時執(zhí)行其他的業(yè)務(wù)操作,如果這個消息已經(jīng)被處理過了,那么此時這個事務(wù)會回滾,這樣保證不會重復(fù)處理消息;
B 系統(tǒng)執(zhí)行成功之后,就會更新自己本地消息表的狀態(tài)以及 A 系統(tǒng)消息表的狀態(tài);
如果 B 系統(tǒng)處理失敗了,那么就不會更新消息表狀態(tài),那么此時 A 系統(tǒng)會定時掃描自己的消息表,如果有未處理的消息,會再次發(fā)送到 MQ 中去,讓 B 再次處理;
這個方案保證了最終一致性,哪怕 B 事務(wù)失敗了,但是 A 會不斷重發(fā)消息,直到 B 那邊成功為止。
這個方案說實話最大的問題就在于嚴(yán)重依賴于數(shù)據(jù)庫的消息表來管理事務(wù)啥的,如果是高并發(fā)場景咋辦呢?咋擴展呢?所以一般確實很少用。
可靠消息最終一致性方案
這個的意思,就是干脆不要用本地的消息表了,直接基于 MQ 來實現(xiàn)事務(wù)。比如阿里的 RocketMQ 就支持消息事務(wù)。
大概的意思就是:
A 系統(tǒng)先發(fā)送一個 prepared 消息到 mq,如果這個 prepared 消息發(fā)送失敗那么就直接取消操作別執(zhí)行了;
如果這個消息發(fā)送成功過了,那么接著執(zhí)行本地事務(wù),如果成功就告訴 mq 發(fā)送確認(rèn)消息,如果失敗就告訴 mq 回滾消息;
如果發(fā)送了確認(rèn)消息,那么此時 B 系統(tǒng)會接收到確認(rèn)消息,然后執(zhí)行本地的事務(wù);
mq 會自動定時輪詢所有 prepared 消息回調(diào)你的接口,問你,這個消息是不是本地事務(wù)處理失敗了,所有沒發(fā)送確認(rèn)的消息,是繼續(xù)重試還是回滾?一般來說這里你就可以查下數(shù)據(jù)庫看之前本地事務(wù)是否執(zhí)行,如果回滾了,那么這里也回滾吧。這個就是避免可能本地事務(wù)執(zhí)行成功了,而確認(rèn)消息卻發(fā)送失敗了。
這個方案里,要是系統(tǒng) B 的事務(wù)失敗了咋辦?重試咯,自動不斷重試直到成功,如果實在是不行,要么就是針對重要的資金類業(yè)務(wù)進行回滾,比如 B 系統(tǒng)本地回滾后,想辦法通知系統(tǒng) A 也回滾;或者是發(fā)送報警由人工來手工回滾和補償。
這個還是比較合適的,目前國內(nèi)互聯(lián)網(wǎng)公司大都是這么玩兒的,要不你舉用 RocketMQ 支持的,要不你就自己基于類似 ActiveMQ?RabbitMQ?自己封裝一套類似的邏輯出來,總之思路就是這樣子的。

最大努力通知方案
這個方案的大致意思就是:
系統(tǒng) A 本地事務(wù)執(zhí)行完之后,發(fā)送個消息到 MQ;
這里會有個專門消費 MQ 的最大努力通知服務(wù),這個服務(wù)會消費 MQ 然后寫入數(shù)據(jù)庫中記錄下來,或者是放入個內(nèi)存隊列也可以,接著調(diào)用系統(tǒng) B 的接口;
要是系統(tǒng) B 執(zhí)行成功就 ok 了;要是系統(tǒng) B 執(zhí)行失敗了,那么最大努力通知服務(wù)就定時嘗試重新調(diào)用系統(tǒng) B,反復(fù) N 次,最后還是不行就放棄。
你們公司是如何處理分布式事務(wù)的?
如果你真的被問到,可以這么說,我們某某特別嚴(yán)格的場景,用的是 TCC 來保證強一致性;然后其他的一些場景基于阿里的 RocketMQ 來實現(xiàn)分布式事務(wù)。
你找一個嚴(yán)格資金要求絕對不能錯的場景,你可以說你是用的 TCC 方案;如果是一般的分布式事務(wù)場景,訂單插入之后要調(diào)用庫存服務(wù)更新庫存,庫存數(shù)據(jù)沒有資金那么的敏感,可以用可靠消息最終一致性方案。
友情提示一下,RocketMQ 3.2.6 之前的版本,是可以按照上面的思路來的,但是之后接口做了一些改變,我這里不再贅述了。
當(dāng)然如果你愿意,你可以參考可靠消息最終一致性方案來自己實現(xiàn)一套分布式事務(wù),比如基于 RocketMQ 來玩兒。
三、分布式鎖
redis 分布式鎖
redis 最普通的分布式鎖
第一個最普通的實現(xiàn)方式,就是在 redis 里創(chuàng)建一個 key,這樣就算加鎖。
SET my:lock 隨機值 NX PX 30000
執(zhí)行這個命令就 ok。
NX:表示只有?key?不存在的時候才會設(shè)置成功。(如果此時 redis 中存在這個 key,那么設(shè)置失敗,返回?nil)PX 30000:意思是 30s 后鎖自動釋放。別人創(chuàng)建的時候如果發(fā)現(xiàn)已經(jīng)有了就不能加鎖了。
釋放鎖就是刪除 key ,但是一般可以用?lua?腳本刪除,判斷 value 一樣才刪除:
-- 刪除鎖的時候,找到 key 對應(yīng)的 value,跟自己傳過去的 value 做比較,如果是一樣的
if?redis.call("get",KEYS[1])?==?ARGV[1]?then
??return?redis.call("del",KEYS[1])
else
??return?0
end
為啥要用隨機值呢?因為如果某個客戶端獲取到了鎖,但是阻塞了很長時間才執(zhí)行完,比如說超過了 30s,此時可能已經(jīng)自動釋放鎖了,此時可能別的客戶端已經(jīng)獲取到了這個鎖,要是你這個時候直接刪除 key 的話會有問題,所以得用隨機值加上面的?lua?腳本來釋放鎖。
但是這樣是肯定不行的。因為如果是普通的 redis 單實例,那就是單點故障?;蛘呤?redis 普通主從,那 redis 主從異步復(fù)制,如果主節(jié)點掛了(key 就沒有了),key 還沒同步到從節(jié)點,此時從節(jié)點切換為主節(jié)點,別人就可以 set key,從而拿到鎖。
RedLock 算法
這個場景是假設(shè)有一個 redis cluster,有 5 個 redis master 實例。然后執(zhí)行如下步驟獲取一把鎖:
獲取當(dāng)前時間戳,單位是毫秒;
跟上面類似,輪流嘗試在每個 master 節(jié)點上創(chuàng)建鎖,過期時間較短,一般就幾十毫秒;
嘗試在大多數(shù)節(jié)點上建立一個鎖,比如 5 個節(jié)點就要求是 3 個節(jié)點?
n / 2 + 1;客戶端計算建立好鎖的時間,如果建立鎖的時間小于超時時間,就算建立成功了;
要是鎖建立失敗了,那么就依次之前建立過的鎖刪除;
只要別人建立了一把分布式鎖,你就得不斷輪詢?nèi)L試獲取鎖。
?
zk 分布式鎖
zk 分布式鎖,其實可以做的比較簡單,就是某個節(jié)點嘗試創(chuàng)建臨時 znode,此時創(chuàng)建成功了就獲取了這個鎖;這個時候別的客戶端來創(chuàng)建鎖會失敗,只能注冊個監(jiān)聽器監(jiān)聽這個鎖。釋放鎖就是刪除這個 znode,一旦釋放掉就會通知客戶端,然后有一個等待著的客戶端就可以再次重新加鎖。
public?class?ZooKeeperSession?{
private?static?CountDownLatch?connectedSemaphore?=?new?CountDownLatch(1);
private?ZooKeeper?zookeeper;
private?CountDownLatch?latch;
public?ZooKeeperSession()?{
try?{
this.zookeeper?=?new?ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181",?50000,?new?ZooKeeperWatcher());
try?{
????????????????connectedSemaphore.await();
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????????System.out.println("ZooKeeper?session?established......");
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
/**
?????*?獲取分布式鎖
?????*
?????*?@param?productId
?????*/
public?Boolean?acquireDistributedLock(Long?productId)?{
????????String?path?=?"/product-lock-"?+?productId;
try?{
????????????zookeeper.create(path,?"".getBytes(),?Ids.OPEN_ACL_UNSAFE,?CreateMode.EPHEMERAL);
return?true;
????????}?catch?(Exception?e)?{
while?(true)?{
try?{
//?相當(dāng)于是給node注冊一個監(jiān)聽器,去看看這個監(jiān)聽器是否存在
????????????????????Stat?stat?=?zk.exists(path,?true);
if?(stat?!=?null)?{
this.latch?=?new?CountDownLatch(1);
this.latch.await(waitTime,?TimeUnit.MILLISECONDS);
this.latch?=?null;
????????????????????}
????????????????????zookeeper.create(path,?"".getBytes(),?Ids.OPEN_ACL_UNSAFE,?CreateMode.EPHEMERAL);
return?true;
????????????????}?catch?(Exception?ee)?{
continue;
????????????????}
????????????}
????????}
return?true;
????}
/**
?????*?釋放掉一個分布式鎖
?????*
?????*?@param?productId
?????*/
public?void?releaseDistributedLock(Long?productId)?{
????????String?path?=?"/product-lock-"?+?productId;
try?{
????????????zookeeper.delete(path,?-1);
????????????System.out.println("release?the?lock?for?product[id="?+?productId?+?"]......");
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
/**
?????*?建立zk?session的watcher
?????*
?????*?@author?bingo
?????*?@since?2018/11/29
?????*
?????*/
private?class?ZooKeeperWatcher?implements?Watcher?{
public?void?process(WatchedEvent?event)?{
????????????System.out.println("Receive?watched?event:?"?+?event.getState());
if?(KeeperState.SyncConnected?==?event.getState())?{
????????????????connectedSemaphore.countDown();
????????????}
if?(this.latch?!=?null)?{
this.latch.countDown();
????????????}
????????}
????}
/**
?????*?封裝單例的靜態(tài)內(nèi)部類
?????*
?????*?@author?bingo
?????*?@since?2018/11/29
?????*
?????*/
private?static?class?Singleton?{
private?static?ZooKeeperSession?instance;
static?{
????????????instance?=?new?ZooKeeperSession();
????????}
public?static?ZooKeeperSession?getInstance()?{
return?instance;
????????}
????}
/**
?????*?獲取單例
?????*
?????*?@return
?????*/
public?static?ZooKeeperSession?getInstance()?{
return?Singleton.getInstance();
????}
/**
?????*?初始化單例的便捷方法
?????*/
public?static?void?init()?{
????????getInstance();
????}
}
也可以采用另一種方式,創(chuàng)建臨時順序節(jié)點:
如果有一把鎖,被多個人給競爭,此時多個人會排隊,第一個拿到鎖的人會執(zhí)行,然后釋放鎖;后面的每個人都會去監(jiān)聽排在自己前面的那個人創(chuàng)建的 node 上,一旦某個人釋放了鎖,排在自己后面的人就會被 zookeeper 給通知,一旦被通知了之后,就 ok 了,自己就獲取到了鎖,就可以執(zhí)行代碼了。
public?class?ZooKeeperDistributedLock?implements?Watcher?{
private?ZooKeeper?zk;
private?String?locksRoot?=?"/locks";
private?String?productId;
private?String?waitNode;
private?String?lockNode;
private?CountDownLatch?latch;
private?CountDownLatch?connectedLatch?=?new?CountDownLatch(1);
private?int?sessionTimeout?=?30000;
public?ZooKeeperDistributedLock(String?productId)?{
this.productId?=?productId;
try?{
????????????String?address?=?"192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
????????????zk?=?new?ZooKeeper(address,?sessionTimeout,?this);
????????????connectedLatch.await();
????????}?catch?(IOException?e)?{
throw?new?LockException(e);
????????}?catch?(KeeperException?e)?{
throw?new?LockException(e);
????????}?catch?(InterruptedException?e)?{
throw?new?LockException(e);
????????}
????}
public?void?process(WatchedEvent?event)?{
if?(event.getState()?==?KeeperState.SyncConnected)?{
????????????connectedLatch.countDown();
return;
????????}
if?(this.latch?!=?null)?{
this.latch.countDown();
????????}
????}
public?void?acquireDistributedLock()?{
try?{
if?(this.tryLock())?{
return;
????????????}?else?{
????????????????waitForLock(waitNode,?sessionTimeout);
????????????}
????????}?catch?(KeeperException?e)?{
throw?new?LockException(e);
????????}?catch?(InterruptedException?e)?{
throw?new?LockException(e);
????????}
????}
public?boolean?tryLock()?{
try?{
//?傳入進去的locksRoot?+?“/”?+?productId
//?假設(shè)productId代表了一個商品id,比如說1
//?locksRoot?=?locks
//?/locks/10000000000,/locks/10000000001,/locks/10000000002
????????????lockNode?=?zk.create(locksRoot?+?"/"?+?productId,?new?byte[0],?ZooDefs.Ids.OPEN_ACL_UNSAFE,?CreateMode.EPHEMERAL_SEQUENTIAL);
//?看看剛創(chuàng)建的節(jié)點是不是最小的節(jié)點
// locks:10000000000,10000000001,10000000002
????????????List?locks?=?zk.getChildren(locksRoot,?false);
????????????Collections.sort(locks);
if(lockNode.equals(locksRoot+"/"+?locks.get(0))){
//如果是最小的節(jié)點,則表示取得鎖
return?true;
????????????}
//如果不是最小的節(jié)點,找到比自己小1的節(jié)點
int?previousLockIndex?=?-1;
for(int?i?=?0;?i?if(lockNode.equals(locksRoot?+?“/”?+?locks.get(i)))?{
?????????????????????previousLockIndex?=?i?-?1;
break;
????????}
???????}
this.waitNode?=?locks.get(previousLockIndex);
????????}?catch?(KeeperException?e)?{
throw?new?LockException(e);
????????}?catch?(InterruptedException?e)?{
throw?new?LockException(e);
????????}
return?false;
????}
private?boolean?waitForLock(String?waitNode,?long?waitTime)?throws?InterruptedException,?KeeperException?{
????????Stat?stat?=?zk.exists(locksRoot?+?"/"?+?waitNode,?true);
if?(stat?!=?null)?{
this.latch?=?new?CountDownLatch(1);
this.latch.await(waitTime,?TimeUnit.MILLISECONDS);
this.latch?=?null;
????????}
return?true;
????}
public?void?unlock()?{
try?{
//?刪除/locks/10000000000節(jié)點
//?刪除/locks/10000000001節(jié)點
????????????System.out.println("unlock?"?+?lockNode);
????????????zk.delete(lockNode,?-1);
????????????lockNode?=?null;
????????????zk.close();
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}?catch?(KeeperException?e)?{
????????????e.printStackTrace();
????????}
????}
public?class?LockException?extends?RuntimeException?{
private?static?final?long?serialVersionUID?=?1L;
public?LockException(String?e)?{
????????????super(e);
????????}
public?LockException(Exception?e)?{
????????????super(e);
????????}
????}
} redis 分布式鎖和 zk 分布式鎖的對比
redis 分布式鎖,其實需要自己不斷去嘗試獲取鎖,比較消耗性能。
zk 分布式鎖,獲取不到鎖,注冊個監(jiān)聽器即可,不需要不斷主動嘗試獲取鎖,性能開銷較小。
另外一點就是,如果是 redis 獲取鎖的那個客戶端 出現(xiàn) bug 掛了,那么只能等待超時時間之后才能釋放鎖;而 zk 的話,因為創(chuàng)建的是臨時 znode,只要客戶端掛了,znode 就沒了,此時就自動釋放鎖。
redis 分布式鎖大家沒發(fā)現(xiàn)好麻煩嗎?遍歷上鎖,計算時間等等......zk 的分布式鎖語義清晰實現(xiàn)簡單。
所以先不分析太多的東西,就說這兩點,我個人實踐認(rèn)為 zk 的分布式鎖比 redis 的分布式鎖牢靠、而且模型簡單易用。?
程序員的眼里,不止有代碼和bug,還有詩與遠(yuǎn)方和妹子?。?!
往期推薦

我的星球是否適合你?
點擊閱讀原文看看我們都聊過啥?
