Redis分佈式鎖的原理及實現,助你輕鬆徜徉大數據時代

前言

大數據時代

,隨著業務場景越來越複雜,使用的架構也就越來越複雜,分佈式、高併發已經是業務要求的常態。說到分佈式,就不得不提分佈式鎖和分佈式事物。今天我們就來談談Redis實現的分佈式鎖的問題!

Redis分佈式鎖的原理及實現,助你輕鬆徜徉大數據時代

分佈式鎖簡介

談起編程語言中的鎖,開發者應該是相當熟悉的,當系統中存在多線程並且多線程之間存在競態條件或者需要協作的時候,我們就會使用到鎖,如Java中的Lock、Synchronized等,但是編程語言中提供的鎖,基本上都只適用於在同一個機器上運行的情況,在分佈式環境下並不適用。

而在某些情況下,我們是需要在多個機器實例/節點之間進行協作的,這個時候,就需要使用到分佈式鎖了。

顧名思義,分佈式鎖就是應用於在分佈式環境下多個節點之間進行同步或者協作的鎖

分佈式鎖同普通的鎖一樣,具有以下幾個重要特性

  • 互斥性,保證只有持有鎖的實例中的某個線程才能進行操作
  • 可重入性,同一個實例的同一個線程可以多次獲取鎖
  • 鎖超時,支持超時自動釋放鎖,避免死鎖的產生
  • 誰加的鎖只能由誰釋放

Redis分佈式鎖原理

由於Redis的命令本身是原子性的,所以,非常適合於作為分佈式鎖的協調者。

一般情況下,為了保證鎖的釋放只能由加鎖者或者超時釋放,一般我們會將對應鍵的值設置為一個線程唯一標誌,如為每個線程生成一個UUID,只有當線程的UUID與鎖的值一致時,才能釋放鎖。

利用Redis來實現分佈式的原理非常簡單,加鎖的時候為某個鍵設置值,釋放的時候將對應的鍵刪除即可。

不過在使用的時候,有一些需要注意的地方,下面我們詳細看下基於Redis不同命令來實現分佈式鎖的操作

setnx命令

Redis分佈式鎖的原理及實現,助你輕鬆徜徉大數據時代

在Redis2.6之前,常用於分佈式鎖的命令是:setnx key val,該命令在對應的鍵沒有值的時候設置成功,存在值的時候設置失敗,保證了同時只會有一個連接者設置成功,也即保證同時只會有一個實例的一個線程獲取成功。

但是該命令存在一個缺陷,不支持超時機制,所以需要額外的命令來保證能夠在超時的情況下釋放鎖,也就是刪除鍵,可以配合expire命令來實現。

由於上述操作涉及到兩個命令,所以最好的方式是通過lua腳本來實現加鎖的操作,如下所示

<code># KEYS[1]是鎖的名稱,KEYS[2]是鎖的值,KEYS[3]是鎖的超時時間
local c = redis.call('setnx', KEYS[1], KEYS[2])
if(c == 1) then
redis.call('expire', KEYS[1], KEYS[3])
end
return c/<code>

釋放鎖的時候,需要驗證釋放鎖的是不是鎖的持有者,具體代碼如下

<code># KEYS[1]是鎖的名稱,KEYS[2]是鎖的值
if redis.call('get', KEYS[1]) == KEYS[2] then
return redis.call('del', KEYS[1])
else return 0
end/<code>

set命令

從上面的setnx命令可以看到,加鎖的操作還是比較麻煩的,所以,在Redis2.6之後,redis的set命令進行了增強,設置值的時候,同時支持設置過期時間

<code># nx表示不存在的時候設置,ex表示設置過期時間,單位是秒
set LOCK VAL nx ex 15/<code>

可以看到,通過該命令,進行加鎖就方便很多了

釋放鎖的操作同setnx裡提到的釋放操作

Redis分佈式鎖實現

上面我們提到的是Redis分佈式鎖的實現原理,不過,每次需要用到鎖的時候都需要自己手動實現一次,雖然代碼本身沒有多少,其實也不是很方便。

正因為如此,有挺多的項目都實現了分佈式,並且提供了更加豐富的功能,如下面討論到的RedisLockRegistry

RedisLockRegistry

Redis分佈式鎖的原理及實現,助你輕鬆徜徉大數據時代

Spring-integration項目是Spring官方提供了集成各種工具的項目,通過integration-redis子項目,提供了非常豐富的功能,關於該項目,後面有時間再寫篇文章具體分析一下,這裡我們用到其中的一個組件RedisLockRegistry

導入依賴

<code><dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-integration/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.integration/<groupid>
<artifactid>spring-integration-redis/<artifactid>
/<dependency>/<code>

配置RedisLockRegistry

<code>@Configuration
public class RedisLockConfiguration {

@Bean
public RedisLockRegistry redisLockRegistry(
RedisConnectionFactory redisConnectionFactory) {
// 注意這裡的時間單位是毫秒
return new RedisLockRegistry(redisConnectionFactory, "registryKey", TIME);
}
}/<code>

RedisLockRegistry相當於一個鎖的管理倉庫,所有的鎖都可以從該倉庫獲取,所有鎖的鍵名為:registryKey:LOCK_NAME,默認時間為60s

配置完鎖的倉庫之後,只需要注入倉庫,當需要使用到鎖的時候,從倉庫中獲取一個鎖就可以了,如下所示

<code>Lock lock = redisLockRegistry.obtain("redis-lock");/<code>

該操作返回一個Lock對象,該對象其實是Spring實現的基於Redis的鎖,該鎖支持了豐富的功能,如tryLock等

但使用的時候,只需要跟普通的鎖一樣操作即可

<code>// lock.tryLock(10, TimeUnit.SECONDS);
lock.lock();
try {
// ops
}catch(Exception e) {

}finally {
// 釋放鎖
lock.unlock();
}/<code>

可以看到,通過RedisLockRegistry,我們可以更加方便地使用Redis分佈式鎖了

RedisLock

從上面的分析中可以看到,LockRegistry維護了一個RedisLock對象的Map,鍵是鎖的名稱,值是對應的Lock對象,該對象是Spring實現的一個內部類,具體實現如下所示

構造方法

<code>private RedisLock(String path) {
this.lockKey = constructLockKey(path);
}/<code>

RedisLock有且只有一個私有構造方法,所以僅能在當前類中進行構造,這也意味著我們無法自己實例化RedisLock實例

構造的過程非常簡單,只是初始化了lockKey,lockKey的內容如下

<code>private String constructLockKey(String path) {
return RedisLockRegistry.this.registryKey + ":" + path;
}/<code>

可以看到,lockKey的值其實就是Registry的名稱 + : + 鎖的名稱

核心方法

對於一把鎖而言,最最核心的方法莫過於加鎖和解鎖了,RedisLock實現了Lock接口,提供了多樣的加鎖方式,分別如下所示

不可中斷鎖

<code>private final ReentrantLock localLock = new ReentrantLock();

@Override
public void lock() {
this.localLock.lock();
while (true) {
try {
while (!obtainLock()) {
Thread.sleep(100); //NOSONAR
}

break;
}
catch (InterruptedException e) {
// 不可中斷,所以忽略中斷異常
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
}
}/<code>

從上面的代碼可以看到,lock方法首先嚐試獲取ReentrantLock,如果獲取成功,才嘗試去獲取分佈式鎖,獲取localLock的目的在於,如果本地有多個線程在競爭該鎖,則只有獲取到本地的鎖的線程才能嘗試去獲取分佈式鎖,好處在於,減少了不必要的網絡開銷,提高性能

由於lock方法明確規定,如果獲取不到鎖,則進行阻塞,直至獲取到鎖或者出現異常,所以上面每隔100毫秒會去嘗試獲取鎖,直到獲取成功或者拋出異常為止

獲取鎖的代碼也非常簡單,如下所示

<code>// 實例化Registry的時候進行初始化
private final String clientId = UUID.randomUUID().toString();

private boolean obtainLock() {
Boolean success =
RedisLockRegistry.this.redisTemplate.execute(
// 獲取鎖的lua腳本

RedisLockRegistry.this.obtainLockScript,
// 獲取的鎖名稱
Collections.singletonList(this.lockKey),
// 鎖的內容
RedisLockRegistry.this.clientId,
// 鎖的過期時間
String.valueOf(RedisLockRegistry.this.expireAfter));

boolean result = Boolean.TRUE.equals(success);
\t
// 如果獲取成功,則記錄鎖的時間
if (result) {
this.lockedAt = System.currentTimeMillis();
}
return result;
}/<code>

從上面獲取鎖的代碼可以看到,每一個LockRegistry實例只會有一個值,該值在Registry實例化的時候通過UUID生成,一個實例內的多個線程之間的競爭直接通過ReentrantLock進行,不涉及到Redis相關的操作。

可中斷鎖

<code>@Override
public void lockInterruptibly() throws InterruptedException {
this.localLock.lockInterruptibly();
try {
while (!obtainLock()) {
Thread.sleep(100); //NOSONAR
}
}
catch (InterruptedException ie) {
// 釋放鎖,並且響應中斷信號
this.localLock.unlock();
Thread.currentThread().interrupt();
throw ie;
}
catch (Exception e) {
this.localLock.unlock();

rethrowAsLockException(e);
}
}/<code>

看懂了lock的代碼,再來看lockInterruptibly就非常簡單了,lock不響應中斷信號,則lockInterruptibly則相應中斷信號,因此,獲取鎖的過程如果出現中斷,則結束獲取操作了

嘗試獲取鎖

嘗試獲取鎖以為著如果能獲取鎖,則獲取,如果不能獲取,則結束,當然,可以附帶等待是時間,有兩個版本的tryLock,如下

<code>@Override
public boolean tryLock() {
try {
// 調用另一個tryLock,並且將時間設置為0
return tryLock(0, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long now = System.currentTimeMillis();
// 先嚐試獲取本地鎖,如果在指定時間內無法獲取到本地鎖,則放棄
if (!this.localLock.tryLock(time, unit)) {
return false;
}
try {

// 記錄獲取鎖到期時間

long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
boolean acquired;

// 如果獲取不到鎖,並且時間還有剩餘,則先休眠100毫秒,然後繼續嘗試獲取
while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) {
Thread.sleep(100); //NOSONAR
}
// 到這裡表示獲取鎖超時
// 如果無法獲取到分佈式鎖,則釋放本地鎖
if (!acquired) {
this.localLock.unlock();
}
return acquired;
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
return false;
}/<code>

具體的分析都詳細寫在註釋裡了,補充一點就是,從tryLock的實現中可以看到,tryLock本身是響應中斷的,與接口的定義一致

釋放鎖

<code>// 判斷鎖的所有者是否是當前實例
public boolean isAcquiredInThisProcess() {
return RedisLockRegistry.this.clientId.equals(
RedisLockRegistry.this.redisTemplate.boundValueOps(this.lockKey).get());
}

// 刪除對應的鍵,也即釋放分佈式鎖
private void removeLockKey() {
if (this.unlinkAvailable) {

RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
}
else {
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
}

@Override
public void unlock() {
// 如果嘗試釋放的不是本線程加的鎖,則拋出異常
if (!this.localLock.isHeldByCurrentThread()) {
throw new IllegalStateException("You do not own lock at " + this.lockKey);
}
// 當前線程持有的鎖的數量,即重入的次數
// 如果此時 > 1,表示當前線程有多次獲取鎖,釋放的時候只減少本地鎖的次數
// 此時其他的方法還持有鎖,不能釋放分佈式鎖
if (this.localLock.getHoldCount() > 1) {
this.localLock.unlock();
return;
}
try {
// 此時分佈式鎖已經由於超時被釋放了,拋出異常
if (!isAcquiredInThisProcess()) {
throw new IllegalStateException("Lock was released in the store due to expiration. " + "The integrity of data protected by this lock may have been compromised.");
}
\t\t
// 如果收到中斷信號,則異步釋放鎖
// 儘快響應中斷...
if (Thread.currentThread().isInterrupted()) {
RedisLockRegistry.this.executor.execute(this::removeLockKey);
}
else {
removeLockKey();
}

if (logger.isDebugEnabled()) {
logger.debug("Released lock; " + this);

}
}
catch (Exception e) {
ReflectionUtils.rethrowRuntimeException(e);
}
finally {
this.localLock.unlock();
}
}/<code>

以上就是小編整理的在Redis中使用分佈式鎖的原理,本質就是set或者setnx命令的使用,以及對應版本的加鎖以及解鎖操作。最後分析了RedisLockRegistry的具體實現,只是小編的個人理解,有不準確的地方請大家多多談論。

還請大家多多評論轉發收藏,讓更多的人看到獲益啊。順便給小編來一波關注,你們的支持就是小編最大的動力!!!


分享到:


相關文章: