百度-DLock源碼解析

百度-DLock源碼解析

簡介

DLock是由Java實現的,一套高效高可靠的分佈式鎖方案。 使用Redis存儲鎖,通過Lua腳本進行原子性鎖操作,實現了基於Redis過期機制的lease,並提供了一種基於變種CLH隊列的進程級鎖競爭模型。

整個組件的架構如下圖所示

百度-DLock源碼解析

由於Github上的描述過於簡略,併為了保持對文章的嚴謹性,在完整地讀過源碼後,我先簡單地描述一下各個角色在該組件中的位置與功能。

該組件中的類CLH隊列實質是AQS Sync Queue簡略版,但因為鎖是非完全進程內可控,所以會導致無法喚醒的窘境,為解決這個問題所以引入了Retry Thread,該線程會以poll的方式輪詢Redis鎖是否可用;在分佈式的環境環境下無可避免地要面對網絡分區和網絡抖動的問題,對於分佈式鎖來說就是續約問題,dlock會以提前續約的方式來儘量確保續約成功,該職責由Lease Thread完成;至於在搶奪鎖成功之後,進程內由Exclusive Thread確認獲得鎖的線程。

在大致瞭解圖上的角色之後,接下來我們開始分析源碼。

源碼解析

本文基於commit id 9b8a82a0da327c3a4dc7128ad0707d26802f3b43所寫,為編寫本文時(2017-10-07 15:51:25)的最新Master分支,閱讀時須注意未來的版本迭代有可能造成功能上的差異。

目錄結構

.
├── DistributedReentrantLock.java - AQS的同步隊列實現
├── domain
│ ├── DLockConfig.java - 鎖的配置,如key和lease time
│ ├── DLockEntity.java - 鎖的實體,主要是鎖的value(locker)
│ ├── DLockStatus.java - 鎖的狀態
│ └── DLockType.java - 鎖的類型
├── exception
│ ├── DLockProcessException.java - RedisProcessException父類
│ ├── OptimisticLockingException.java - redis命令正確但輸入非我們所期望
│ └── RedisProcessException.java - jedis執行錯誤
├── jedis
│ └── JedisClient.java
├── processor
│ ├── DLockProcessor.java - 鎖操作接口,如讀取、設置、續約
│ └── impl
│ └── RedisLockProcessor.java - 鎖操作實現類
├── support
│ └── DLockGenerator.java - 組合DLockConfig和DistributedReentrantLock的工具
└── utils
├── EnumUtils.java
├── NetUtils.java
├── ReflectionUtils.java
└── ValuedEnum.java

組件功能描述

DLockConfig - key與續約時間的Holder

public class DLockConfig implements Serializable {
private static final long serialVersionUID = -1332663877601479136L;
/**
* Prefix for unique key generating
*/
public static final String UK_PRE = "DLOCK";
/**
* Separator for unique key generating
*/

public static final String UK_SP = "_";
/**
* Lock type represents a group lockTargets with the same type.
* The type is divided by different business scenarios, kind of USER_LOCK, ORDER_LOCK, BATCH_PROCCESS_LOCK...
*/
private final String lockType;
/**
* Lock target represents a real lock target. lockType: USER_LOCK, lockTarget should be the UserID.
*
* @see DLockEntity#locker
*/
private final String lockTarget;
/**
* Lock unique key represents the minimum granularity of the lock.
* The naming policy is $UK_PRE_$lockType_$lockTarget
*/
private final String lockUniqueKey;
/**
* Lock lease duration
*/
private final int lease;
/**
* Lock Lease time unit
*/
private final TimeUnit leaseTimeUnit;
/**
* Constructor with lockType & lockTarget & leaseTime & leaseTimeUnit
*/
public DLockConfig(String lockType, String lockTarget, int lease, TimeUnit leaseTimeUnit) {
this.lockType = lockType;
this.lockTarget = lockTarget;
this.lockUniqueKey = UK_PRE + UK_SP + lockType + UK_SP + StringUtils.trimToEmpty(lockTarget);
this.lease = lease;
this.leaseTimeUnit = leaseTimeUnit;
}

// getter and setter
}

該類存儲key的構造方式,最終輸出為lockUniqueKey,也就是說redis key為"DLOCK_" + lockType + "_" + StringUtils.trimToEmpty(lockTarget),而redis value則存儲在DLockEntity。

DLockEntity - 鎖的元數據

public class DLockEntity implements Serializable, Cloneable { 

private static final long serialVersionUID = 8479390959137749786L;
/**
* Task status default as {@link DLockStatus#INITIAL}
*/
private DLockStatus lockStatus = DLockStatus.INITIAL;
/**
* The server ip address that locked the task
*/
private String locker;
/**
* Lock time for milliseconds
*/
private Long lockTime = -1L;

// getter and setter
}

該類的最主要作用是存儲其分佈式鎖的redis value,也就是屬性locker。

在目前版本來看lockStatus和lockTime並無用處(是的,我就是這麼肯定)。從DLock的整體設計上來說,其開始的目標是支持DB和Redis實現,而這兩個屬性正是預留所用。

RedisLockProcessor - redis鎖命令執行者

讀取鎖

該讀取操作完成後僅直接封裝locker,其他屬性為人工填充。

 /**
* Load by unique key. For redis implement, you can find locker & status from the result entity.
*
* @param uniqueKey key
* @throws RedisProcessException if catch any exception from {@link redis.clients.jedis.Jedis}
*/
@Override

public DLockEntity load(String uniqueKey) throws RedisProcessException {
// GET command
String locker;
try {
locker = jedisClient.get(uniqueKey);
} catch (Exception e) {
LOGGER.warn("Exception occurred by GET command for key: {}", uniqueKey, e);
throw new RedisProcessException("Exception occurred by GET command for key:" + uniqueKey, e);
}
if (locker == null) {
return null;
}
// build entity
DLockEntity lockEntity = new DLockEntity();
lockEntity.setLocker(locker);
lockEntity.setLockStatus(DLockStatus.PROCESSING);
return lockEntity;
}

設置鎖

調用setnx命令進行鎖的搶奪,pexpire設置以毫秒為單位的過期時間。

 /**
* Update for lock using redis SET(NX, PX) command.
*
* @param newLock with locker in it
* @param lockConfig
* @throws RedisProcessException Redis command execute exception
* @throws OptimisticLockingException the lock is hold by the other request.
*/
@Override
public void updateForLock(DLockEntity newLock, DLockConfig lockConfig)
throws RedisProcessException, OptimisticLockingException {
// SET(NX, PX) command
String lockRes;
try {
lockRes = jedisClient.set(lockConfig.getLockUniqueKey(), newLock.getLocker(), SET_ARG_NOT_EXIST,
SET_ARG_EXPIRE, lockConfig.getMillisLease());
} catch (Exception e) {
LOGGER.warn("Exception occurred by SET(NX, PX) command for key: {}", lockConfig.getLockUniqueKey(), e);
throw new RedisProcessException(
"Exception occurred by SET(NX, PX) command for key:" + lockConfig.getLockUniqueKey(), e);
}
if (!RES_OK.equals(lockRes)) {

LOGGER.warn("Fail to get lock for key:{} ,locker={}", lockConfig.getLockUniqueKey(), newLock.getLocker());
throw new OptimisticLockingException(
"Fail to get lock for key:" + lockConfig.getLockUniqueKey() + " ,locker=" + newLock.getLocker());
}
}

鎖的過期/續約

這裡使用了lua以達到原子性事務的作用。

 /**
* Extend lease for lock with lua/> *
* @param leaseLock with locker in it
* @param lockConfig
* @throws RedisProcessException if catch any exception from {@link redis.clients.jedis.Jedis}
* @throws OptimisticLockingException if the lock is released or be hold by another one.
*/
@Override
public void expandLockExpire(DLockEntity leaseLock, DLockConfig lockConfig)
throws RedisProcessException, OptimisticLockingException {
// Expire if key is existed and equal with the specified value(locker).
// pexpire的控制單位為毫秒,expire的控制單位為秒
String leaseScript = "if (redis.call('get', KEYS[1]) == ARGV[1]) then "
+ " return redis.call('pexpire', KEYS[1], ARGV[2]); "
+ "else"
+ " return nil; "
+ "end; ";
Object leaseRes;
try {
leaseRes = jedisClient.eval(leaseScript, Arrays.asList(lockConfig.getLockUniqueKey()),
Arrays.asList(leaseLock.getLocker(), lockConfig.getMillisLease() + ""));
} catch (Exception e) {
LOGGER.warn("Exception occurred by ExpandLease lua/> throw new RedisProcessException(
"Exception occurred by ExpandLease lua/> }
// null means lua return nil (the lock is released or be hold by the other request)
if (leaseRes == null) {
LOGGER.warn("Fail to lease for key:{} ,locker={}", lockConfig.getLockUniqueKey(), leaseLock.getLocker());
throw new OptimisticLockingException(
"Fail to lease for key:" + lockConfig.getLockUniqueKey() + " ,locker=" + leaseLock.getLocker());
}
}

鎖的釋放

這裡同樣使用了lua以達到原子性事務的作用。

 /**
* Release lock using lua/> *
* @param currentLock with locker in it
* @param lockConfig
* @throws RedisProcessException if catch any exception from {@link redis.clients.jedis.Jedis}
* @throws OptimisticLockingException if the lock is released or be hold by another one.
*/
@Override
public void updateForUnlock(DLockEntity currentLock, DLockConfig lockConfig)
throws RedisProcessException, OptimisticLockingException {
// Delete if key is existed and equal with the specified value(locker).
String unlockScript = "if (redis.call('get', KEYS[1]) == ARGV[1]) then "
+ " return redis.call('del', KEYS[1]); "
+ "else "
+ " return nil; "
+ "end;";
Object unlockRes;
try {
unlockRes = jedisClient.eval(unlockScript, Arrays.asList(lockConfig.getLockUniqueKey()),
Arrays.asList(currentLock.getLocker()));
} catch (Exception e) {
LOGGER.warn("Exception occurred by Unlock lua/> throw new RedisProcessException(
"Exception occurred by Unlock lua/> }
// null means lua return nil (the lock is released or be hold by the other request)
if (unlockRes == null) {
LOGGER.warn("Fail to unlock for key:{} ,locker={}", lockConfig.getLockUniqueKey(), currentLock.getLocker());
throw new OptimisticLockingException("Fail to unlock for key:" + lockConfig.getLockUniqueKey()
+ ",locker=" + currentLock.getLocker());
}
}

DistributedReentrantLock - 可重入的分佈式鎖

此類為DLock組件的核心部分,與AQS一樣使用了無鎖的搶奪方式。

其中字段的含義如下

 /**
* Lock configuration(存儲redis key和lease time)
*/
private final DLockConfig lockConfig;
/**
* Lock processor(redis lock service/processor)
*/
private final DLockProcessor lockProcessor;
/**
* Head of the wait queue, lazily initialized. Except for initialization, it is modified only via method setHead.
* Note: If head exists, its waitStatus is guaranteed not to be CANCELLED.(sync queue隊列頭結點)
*/
private final AtomicReference head = new AtomicReference<>();
/**
* Tail of the wait queue, lazily initialized. Modified only via method enq to add new wait node.(sync queue隊列尾節點)
*/
private final AtomicReference tail = new AtomicReference<>();
/**
* The current owner of exclusive mode synchronization.(獲得鎖的線程)
*/
private final AtomicReference exclusiveOwnerThread = new AtomicReference<>();
/**
* Retry thread reference(進程間鎖狀態檢測所用)
*/
private final AtomicReference retryLockRef = new AtomicReference<>();
/**
* Expand lease thread reference(自動續約線程)
*/
private final AtomicReference expandLockRef = new AtomicReference<>();
/**
* Once a thread hold this lock, the thread can reentrant the lock.
* This value represents the count of holding this lock. Default as 0(可重入鎖計數器)
*/
private final AtomicInteger holdCount = new AtomicInteger(0);

AQS Sync Queue入隊操作

因為分佈式鎖的跨進程特性,在當前進程中沒有獲取到鎖後則啟動RetryThread,以防當前進程中Sync Queue元素無法被喚醒,然後再掛起當前線程

 final void acquireQueued(final Node node) {
for (; ; ) {
final Node p = node.prev.get();
if (p == head.get() && tryLock()) {
head.set(node);
p.next.set(null); // help GC
break;
}
// if need, start retry thread
// 獲取到鎖的時候會設置為當前線程,在釋放鎖的時候會設置為null
if (exclusiveOwnerThread.get() == null) {
// 進程級別的鎖,因為不能跨進程通知,所以用poll的方式喚醒
startRetryThread();
}
// park current thread
LockSupport.park(this);
}
}

retry thread首次將會在十分之一的lease time時間啟動,以後每隔六分之一個lease time單位時間進行輪詢。

retryLockRef.compareAndSet(t, new RetryLockThread((int) (lockConfig.getMillisLease() / 10), (int) (lockConfig.getMillisLease() / 6)));

在檢測到分佈式鎖被釋放之後馬上喚醒Sync Queue中的頭結點以進行鎖的搶奪。

搶奪鎖

 /**
* Lock redis record through the atomic command Set(key, value, NX, PX, expireTime), only one request will success
* while multiple concurrently requesting.
*/
@Override
public boolean tryLock() {
// current thread can reentrant, and locked times add once
// 可重入特性
if (Thread.currentThread() == this.exclusiveOwnerThread.get()) {
this.holdCount.incrementAndGet();
return true;
}
DLockEntity newLock = new DLockEntity();
newLock.setLockTime(System.currentTimeMillis());
// IP_ThreadId
newLock.setLocker(generateLocker());
newLock.setLockStatus(DLockStatus.PROCESSING);
boolean locked = false;
try {
// get lock directly
lockProcessor.updateForLock(newLock, lockConfig);
locked = true;
} catch (OptimisticLockingException | DLockProcessException e) {
// NOPE. Retry in the next round.
}
if (locked) {
// set exclusive thread
this.exclusiveOwnerThread.set(Thread.currentThread());
// locked times reset to one(可重入計數器)
this.holdCount.set(1);
// shutdown retry thread(已經成功獲取鎖則不用再次重試了)
shutdownRetryThread();
// start the timer for expand lease time(開啟自動續約線程)
startExpandLockLeaseThread(newLock);
}
return locked;
}

結合上述的組件架構圖來看,exclusive owner thread將會在成功搶奪鎖後被設置,此時就不再需要retry thread去監聽鎖的狀態,並且啟動lease thread以進行續約。

lease thread將會即時啟動,以後每隔75%的lease time時間進行續約。

// set new expand lock thread
int retryInterval = (int) (lockConfig.getMillisLease() * 0.75);
expandLockRef.compareAndSet(t, new ExpandLockLeaseThread(lock, 1, retryInterval));

釋放鎖

 /**
* Attempts to release this lock.


*


* If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
@Override
public void unlock() throws IllegalMonitorStateException {
// lock must be hold by current thread
if (Thread.currentThread() != this.exclusiveOwnerThread.get()) {
throw new IllegalMonitorStateException();
}
// lock is still be hold
if (holdCount.decrementAndGet() > 0) {
return;
}
// clear remote lock
DLockEntity currentLock = new DLockEntity();
currentLock.setLocker(generateLocker());
currentLock.setLockStatus(DLockStatus.PROCESSING);
try {


// release remote lock(刪除當前鎖)
lockProcessor.updateForUnlock(currentLock, lockConfig);
} catch (OptimisticLockingException | DLockProcessException e) {
// NOPE. Lock will deleted automatic after the expire time.
} finally {
// Release exclusive owner
this.exclusiveOwnerThread.compareAndSet(Thread.currentThread(), null);
// Shutdown expand thread
shutdownExpandThread();
// wake up the head node for compete lock
unparkQueuedNode();
}
}

在釋放鎖以後會調用shutdownExpandThread()中斷續約線程

 /**
* Shutdown retry thread
*/
private void shutdownRetryThread() {
RetryLockThread t = retryLockRef.get();
if (t != null && t.isAlive()) {
t.interrupt();
}
}

lease thread的睡眠使用wait()實現,所以當其被中斷的時候會將會拋出InterruptedException,此時通過變量控制可優雅地中止該任務。

 @Override
public void run() {
while (!shouldShutdown) {
synchronized (sync) {
try {
// first running, delay
if (firstRunning && delay > 0) {
firstRunning = false;
sync.wait(delay);
}
// execute task
execute();
// wait for interval

sync.wait(retryInterval);
} catch (InterruptedException e) {
shouldShutdown = true;
}
}
}
// clear associated resources for implementations
beforeShutdown();
}

在中止lease thread之後喚醒Sync Queue的頭結點,重新回到了鎖的競爭之上,至此DLock的核心實現就全部解析完成。


分享到:


相關文章: