分布式鎖實現大型連續劇(二):Zookeeper

分佈式鎖實現大型連續劇(二):Zookeeper


前言

緊跟上文的:分佈式鎖實現(一):Redis ,這篇我們用Zookeeper來設計和實現分佈式鎖,並且研究下開源客戶端工具Curator的分佈式鎖源碼

設計實現

一、基本算法

1.在某父節點下創建臨時有序節點

2.判斷創建的節點是否是當前父節點下所有子節點中序號最小的

3.是序號最小的成功獲取鎖,否則監聽比自己小的那個節點,進行watch,當該節點被刪除的時候通知當前節點,重新獲取鎖

4.解鎖的時候刪除當前節點

二、關鍵點

臨時有序節點

實現Zookeeper分佈式鎖關鍵就在於其[臨時有序節點]的特性,在Zookeeper中有四種節點

1.PERSISTENT 持久,若不手動刪除就永久存在

2.PERSISTENT_SEQUENTIAL 持久有序節點,zookeeper會為節點編號(保證有序)

3.EPHEMERAL 臨時,一個客戶端會話斷開後會自動刪除

4.EPHEMERAL_SEQUENTIAL 臨時有序節點,zookeeper會為節點編號(保證有序)

三、代碼實現

我們基於ZkClient這個客戶端來實現,當然也可以用原生Zookeeper API,大致是一樣的

座標如下:

com.101tec

zkclient

0.2

代碼如下:

public class MyDistributedLock {

private ZkClient zkClient;

private String name;

private String currentLockPath;

private CountDownLatch countDownLatch;

private static final String PARENT_LOCK_PATH = "/distribute_lock";

public MyDistributedLock(ZkClient zkClient, String name) {

this.zkClient = zkClient;

this.name = name;

}

//加鎖

public void lock() {

//判斷父節點是否存在,不存在就創建

if (!zkClient.exists(PARENT_LOCK_PATH)) {

try {

//多個線程只會成功建立一次

zkClient.createPersistent(PARENT_LOCK_PATH);

} catch (Exception ignored) {

}

}

//創建當前目錄下的臨時有序節點

currentLockPath = zkClient.createEphemeralSequential(PARENT_LOCK_PATH + "/", System.currentTimeMillis());

//校驗是否最小節點

checkMinNode(currentLockPath);

}

//解鎖

public void unlock() {

System.out.println("delete : " + currentLockPath);

zkClient.delete(currentLockPath);

}

private boolean checkMinNode(String lockPath) {

//獲取當前目錄下所有子節點

List children = zkClient.getChildren(PARENT_LOCK_PATH);

Collections.sort(children);

int index = children.indexOf(lockPath.substring(PARENT_LOCK_PATH.length() + 1));

if (index == 0) {

System.out.println(name + ":success");

if (countDownLatch != null) {

countDownLatch.countDown();

}

return true;

} else {

String waitPath = PARENT_LOCK_PATH + "/" + children.get(index - 1);

//等待前一個節點釋放的監聽

waitForLock(waitPath);

return false;

}

}

private void waitForLock(String prev) {

System.out.println(name + " current path :" + currentLockPath + ":fail add listener" + " wait path :" + prev);

countDownLatch = new CountDownLatch(1);

zkClient.subscribeDataChanges(prev, new IZkDataListener() {

@Override

public void handleDataChange(String s, Object o) throws Exception {

}

@Override

public void handleDataDeleted(String s) throws Exception {

System.out.println("prev node is done");

checkMinNode(currentLockPath);

}

});

if (!zkClient.exists(prev)) {

return;

}

try {

countDownLatch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

countDownLatch = null;

}

}

加鎖

  1. zkClient.exists先判斷父節點是否存在,不存在就創建,zookeeper可以保證只會創建成功一次
  2. 在當前目錄下zkClient.createEphemeralSequential創建臨時有序節點,再判斷當前目錄下此節點是否為序號最小的,如果是,成功獲取鎖,否則的話拿比自己小的節點,並做監聽
  3. waitForLock等待比自己小的節點,subscribeDataChanges監聽一個節點的變化,handleDataDeleted裡面再次做checkMinNode的判斷
  4. 監聽完畢後,再判斷一次此節點是否存在,因為在監聽的過程中有可能之前小的那個節點重新釋放了鎖,如果之前節點不存在的話,無需在這裡等待,這裡的等待是通過countDownLatch實現的

解鎖

解鎖就是通過zkClient的delete刪除當前節點

測試用例

通過啟動多個線程來測試lock、unlock的過程,查看是否有序

public class MyDistributedLockTest {

public static void main(String[] args) {

ZkClient zk = new ZkClient("127.0.0.1:2181", 5 * 10000);

for (int i = 0; i < 20; i++) {

String name = "thread" + i;

Thread thread = new Thread(() -> {

MyDistributedLock myDistributedLock = new MyDistributedLock(zk, name);

myDistributedLock.lock();

// try {

// Thread.sleep(1 * 1000);

// } catch (InterruptedException e) {

// e.printStackTrace();

// }

myDistributedLock.unlock();

});

thread.start();

}

}

}

執行結果如下,多線程情況下lock/unlock和監聽一切正常:

thread1 current path :/distribute_lock2/0000000007:fail add listener wait path :/distribute_lock2/0000000006

thread6 current path :/distribute_lock2/0000000006:fail add listener wait path :/distribute_lock2/0000000005

thread3:success

delete : /distribute_lock2/0000000000

thread2 current path :/distribute_lock2/0000000005:fail add listener wait path :/distribute_lock2/0000000004

thread7 current path :/distribute_lock2/0000000004:fail add listener wait path :/distribute_lock2/0000000003

thread9 current path :/distribute_lock2/0000000009:fail add listener wait path :/distribute_lock2/0000000008

thread5 current path :/distribute_lock2/0000000008:fail add listener wait path :/distribute_lock2/0000000007

thread0 current path :/distribute_lock2/0000000001:fail add listener wait path :/distribute_lock2/0000000000

thread8 current path :/distribute_lock2/0000000002:fail add listener wait path :/distribute_lock2/0000000001

thread4 current path :/distribute_lock2/0000000003:fail add listener wait path :/distribute_lock2/0000000002

delete : /distribute_lock2/0000000001

prev node is done

thread8:success

delete : /distribute_lock2/0000000002

prev node is done

thread4:success

delete : /distribute_lock2/0000000003

prev node is done

thread7:success

delete : /distribute_lock2/0000000004

prev node is done

thread2:success

delete : /distribute_lock2/0000000005

prev node is done

thread6:success

delete : /distribute_lock2/0000000006

prev node is done

thread1:success

delete : /distribute_lock2/0000000007

prev node is done

thread5:success

delete : /distribute_lock2/0000000008

prev node is done

thread9:success

delete : /distribute_lock2/0000000009

Curator源碼分析

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy); client.start(); InterProcessMutex lock2 = new InterProcessMutex(client, "/test"); try { lock.acquire(); //業務 } catch (Exception e) { e.printStackTrace(); } finally { lock.release(); }

  1. CuratorFrameworkFactory.newClient獲取zookeeper的客戶端,retryPolicy指定重試策略,開啟客戶端
  2. Curator本身提供了多種鎖的實現,這裡我們以InterProcessMutex可重入鎖為例, lock.acquire()方法獲取鎖,lock.release()來釋放鎖,acquire方法也提供了重載的等待時間參數

二、源碼分析

加鎖

acquire內部就直接internalLock方法,傳了-1的等待時間

public void acquire() throws Exception {

if(!this.internalLock(-1L, (TimeUnit)null)) {

throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);

}

}

internalLock方法首先判斷是否是重入鎖,通過ConcurrentMap維護線程和一個原子計數器,非重入鎖的話,再通過attemptLock去獲取鎖

private boolean internalLock(long time, TimeUnit unit) throws Exception

{

/*

Note on concurrency: a given lockData instance

can be only acted on by a single thread so locking isn't necessary

*/

Thread currentThread = Thread.currentThread();

LockData lockData = threadData.get(currentThread);

if ( lockData != null )

{

// re-entering

lockData.lockCount.incrementAndGet();

return true;

}

String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());

if ( lockPath != null )

{

LockData newLockData = new LockData(currentThread, lockPath);

threadData.put(currentThread, newLockData);

return true;

}

return false;

}

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception

{

final long startMillis = System.currentTimeMillis();

final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;

final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;

int retryCount = 0;

String ourPath = null;

boolean hasTheLock = false;

boolean isDone = false;

while ( !isDone )

{

isDone = true;

try

{

ourPath = driver.createsTheLock(client, path, localLockNodeBytes);

hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);

}

catch ( KeeperException.NoNodeException e )

{

// gets thrown by StandardLockInternalsDriver when it can't find the lock node

// this can happen when the session expires, etc. So, if the retry allows, just try it all again

if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )

{

isDone = false;

}

else

{

throw e;

}

}

}

if ( hasTheLock )

{

return ourPath;

}

return null;

}

createsTheLock就是調用curator封裝的api去創建臨時有序節點

public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception

{

String ourPath;

if ( lockNodeBytes != null )

{

ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);

}

else

{

ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);

}

return ourPath;

}

internalLockLoop鎖判斷,內部就是driver.getsTheLock去判斷是否是當前目錄下最小節點,如果是的話,返回獲取鎖成功,否則的話對previousSequencePath進行監聽,監聽動作完成後再對等待時間進行重新判斷

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception

{

boolean haveTheLock = false;

boolean doDelete = false;

try

{

if ( revocable.get() != null )

{

client.getData().usingWatcher(revocableWatcher).forPath(ourPath);

}

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )

{

List children = getSortedChildren();

String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);

if ( predicateResults.getsTheLock() )

{

haveTheLock = true;

}

else

{

String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

synchronized(this)

{

try

{

// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak

client.getData().usingWatcher(watcher).forPath(previousSequencePath);

if ( millisToWait != null )

{

millisToWait -= (System.currentTimeMillis() - startMillis);

startMillis = System.currentTimeMillis();

if ( millisToWait <= 0 )

{

doDelete = true; // timed out - delete our node

break;

}

wait(millisToWait);

}

else

{

wait();

}

}

catch ( KeeperException.NoNodeException e )

{

// it has been deleted (i.e. lock released). Try to acquire again

}

}

}

}

}

catch ( Exception e )

{

ThreadUtils.checkInterrupted(e);

doDelete = true;

throw e;

}

finally

{

if ( doDelete )

{

deleteOurPath(ourPath);

}

}

return haveTheLock;

}

解鎖

release代碼相對來說比較簡單,就是先判斷map裡面是否存在當前線程的鎖計數,不存在拋出異常,存在的話,進行原子減一操作,releaseLock內部就是刪除節點操作,小於0的時候,從map裡面移除

public void release() throws Exception

{

/*

Note on concurrency: a given lockData instance

can be only acted on by a single thread so locking isn't necessary

*/

Thread currentThread = Thread.currentThread();

LockData lockData = threadData.get(currentThread);

if ( lockData == null )

{

throw new IllegalMonitorStateException("You do not own the lock: " + basePath);

}

int newLockCount = lockData.lockCount.decrementAndGet();

if ( newLockCount > 0 )

{

return;

}

if ( newLockCount < 0 )

{

throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);

}

try

{

internals.releaseLock(lockData.lockPath);

}

finally

{

threadData.remove(currentThread);

}

}

複製代碼

後記

分佈式鎖的實現目前主流比較常用的實現就是Redis和Zookeeper了,相比較自己的實現,Redission和Curator的設計實現更為優秀,也更值得我們借鑑和學習

千里之行,積於跬步;萬里之船,成於羅盤,共勉。


分享到:


相關文章: