Springboot-Redis分佈式鎖

作者:Mr---Dmy.oschina.net/dengfuwei/blog/1600681

隨著現在分佈式架構越來越盛行,在很多場景下需要使用到分佈式鎖。分佈式鎖的實現有很多種,比如基於數據庫、 zookeeper 等,本文主要介紹使用 Redis 做分佈式鎖的方式,並封裝成spring boot starter,方便使用

一. Redis 分佈式鎖的實現以及存在的問題

鎖是針對某個資源,保證其訪問的互斥性,在實際使用當中,這個資源一般是一個字符串。使用 Redis 實現鎖,主要是將資源放到 Redis 當中,利用其原子性,當其他線程訪問時,如果 Redis 中已經存在這個資源,就不允許之後的一些操作。spring boot使用 Redis 的操作主要是通過 RedisTemplate 來實現,一般步驟如下:

1.將鎖資源放入 Redis (注意是當key不存在時才能放成功,所以使用 setIfAbsent 方法):

redisTemplate.opsForValue().setIfAbsent("key", "value");

2.設置過期時間

redisTemplate.expire("key", 30000, TimeUnit.MILLISECONDS);

3.釋放鎖

redisTemplate.delete("key");

一般情況下,這樣的實現就能夠滿足鎖的需求了,但是如果在調用 setIfAbsent 方法之後線程掛掉了,即沒有給鎖定的資源設置過期時間,默認是永不過期,那麼這個鎖就會一直存在。所以需要保證設置鎖及其過期時間兩個操作的原子性,spring data的 RedisTemplate 當中並沒有這樣的方法。

但是在jedis當中是有這種原子操作的方法的,需要通過 RedisTemplate 的 execute 方法獲取到jedis裡操作命令的對象,代碼如下:

String result = redisTemplate.execute(new RedisCallback<string>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
return commands.set(key, "鎖定的資源", "NX", "PX", expire);
}
});
/<string>

注意: Redis 從2.6.12版本開始 set 命令支持 NX 、 PX 這些參數來達到 setnx 、 setex 、 psetex 命令的效果。

文檔參見: http://doc.redisfans.com/string/set.html

NX: 表示只有當鎖定資源不存在的時候才能 SET 成功。利用 Redis 的原子性,保證了只有第一個請求的線程才能獲得鎖,而之後的所有線程在鎖定資源被釋放之前都不能獲得鎖。

PX: expire 表示鎖定的資源的自動過期時間,單位是毫秒。具體過期時間根據實際場景而定

這樣在獲取鎖的時候就能夠保證設置 Redis 值和過期時間的原子性,避免前面提到的兩次 Redis 操作期間出現意外而導致的鎖不能釋放的問題。但是這樣還是可能會存在一個問題,考慮如下的場景順序:

  • 線程T1獲取鎖
  • 線程T1執行業務操作,由於某些原因阻塞了較長時間
  • 鎖自動過期,即鎖自動釋放了
  • 線程T2獲取鎖
  • 線程T1業務操作完畢,釋放鎖(其實是釋放的線程T2的鎖)

按照這樣的場景順序,線程T2的業務操作實際上就沒有鎖提供保護機制了。所以,每個線程釋放鎖的時候只能釋放自己的鎖,即鎖必須要有一個擁有者的標記,並且也需要保證釋放鎖的原子性操作。

因此在獲取鎖的時候,可以生成一個隨機不唯一的串放入當前線程中,然後再放入 Redis 。釋放鎖的時候先判斷鎖對應的值是否與線程中的值相同,相同時才做刪除操作。

Redis 從2.6.0開始通過內置的 Lua 解釋器,可以使用 EVAL 命令對 Lua 腳本進行求值,文檔參見: http://doc.redisfans.com/script/eval.html

因此我們可以通過 Lua 腳本來達到釋放鎖的原子操作,定義 Lua 腳本如下:

if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else

return 0
end

具體意思可以參考上面提供的文檔地址

使用 RedisTemplate 執行的代碼如下:

// 使用Lua腳本刪除Redis中匹配value的key,可以避免由於方法執行時間過長而redis鎖自動過期失效的時候誤刪其他線程的鎖
// spring自帶的執行腳本方法中,集群模式直接拋出不支持執行腳本的異常,所以只能拿到原redis的connection來執行腳本
Long result = redisTemplate.execute(new RedisCallback<long>() {
public Long doInRedis(RedisConnection connection) throws DataAccessException {
Object nativeConnection = connection.getNativeConnection();
// 集群模式和單機模式雖然執行腳本的方法一樣,但是沒有共同的接口,所以只能分開執行
// 集群模式
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
// 單機模式
else if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
return 0L;
}
});
/<long>

代碼中分為集群模式和單機模式,並且兩者的方法、參數都一樣,原因是spring封裝的執行腳本的方法中( RedisConnection 接口繼承於 RedisScriptingCommands 接口的 eval 方法),集群模式的方法直接拋出了不支持執行腳本的異常(雖然實際是支持的),所以只能拿到 Redis 的connection來執行腳本,而 JedisCluster 和 Jedis 中的方法又沒有實現共同的接口,所以只能分開調用。

spring封裝的集群模式執行腳本方法源碼:

# JedisClusterConnection.java
/**
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisScriptingCommands#eval(byte[], org.springframework.data.redis.connection.ReturnType, int, byte[][])
*/
@Override
public T eval(byte[]/> throw new InvalidDataAccessApiUsageException("Eval is not supported in cluster environment.");
}

至此,我們就完成了一個相對可靠的 Redis 分佈式鎖,但是,在集群模式的極端情況下,還是可能會存在一些問題,比如如下的場景順序( 本文暫時不深入開展 ):

  • 線程T1獲取鎖成功
  • Redis 的master節點掛掉,slave自動頂上
  • 線程T2獲取鎖,會從slave節點上去判斷鎖是否存在,由於Redis的master slave複製是異步的,所以此時線程T2可能成功獲取到鎖

為了可以以後擴展為使用其他方式來實現分佈式鎖,定義了接口和抽象類,所有的源碼如下:

# DistributedLock.java 頂級接口
/**
* @author fuwei.deng
* @date 2017年6月14日 下午3:11:05

* @version 1.0.0
*/
public interface DistributedLock {
public static final long TIMEOUT_MILLIS = 30000;
public static final int RETRY_TIMES = Integer.MAX_VALUE;
public static final long SLEEP_MILLIS = 500;
public boolean lock(String key);
public boolean lock(String key, int retryTimes);
public boolean lock(String key, int retryTimes, long sleepMillis);
public boolean lock(String key, long expire);
public boolean lock(String key, long expire, int retryTimes);
public boolean lock(String key, long expire, int retryTimes, long sleepMillis);
public boolean releaseLock(String key);
}


# AbstractDistributedLock.java 抽象類,實現基本的方法,關鍵方法由子類去實現
/**
* @author fuwei.deng
* @date 2017年6月14日 下午3:10:57
* @version 1.0.0
*/
public abstract class AbstractDistributedLock implements DistributedLock {
@Override
public boolean lock(String key) {
return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, int retryTimes) {
return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, int retryTimes, long sleepMillis) {
return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
}
@Override
public boolean lock(String key, long expire) {
return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, long expire, int retryTimes) {
return lock(key, expire, retryTimes, SLEEP_MILLIS);
}
}


# RedisDistributedLock.java Redis分佈式鎖的實現
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;
/**
* @author fuwei.deng
* @date 2017年6月14日 下午3:11:14
* @version 1.0.0
*/
public class RedisDistributedLock extends AbstractDistributedLock {
private final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);
private RedisTemplate<object> redisTemplate;
private ThreadLocal<string> lockFlag = new ThreadLocal<string>();
public static final String UNLOCK_LUA;
static {
StringBuilder sb = new StringBuilder();
sb.append("if redis.call("get",KEYS[1]) == ARGV[1] ");
sb.append("then ");
sb.append(" return redis.call("del",KEYS[1]) ");
sb.append("else ");
sb.append(" return 0 ");
sb.append("end ");
UNLOCK_LUA = sb.toString();
}
public RedisDistributedLock(RedisTemplate<object> redisTemplate) {
super();
this.redisTemplate = redisTemplate;
}
@Override
public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
boolean result = setRedis(key, expire);
// 如果獲取鎖失敗,按照傳入的重試次數進行重試
while((!result) && retryTimes-- > 0){

try {
logger.debug("lock failed, retrying..." + retryTimes);
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
return false;
}
result = setRedis(key, expire);
}
return result;
}
private boolean setRedis(String key, long expire) {
try {
String result = redisTemplate.execute(new RedisCallback<string>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
String uuid = UUID.randomUUID().toString();
lockFlag.set(uuid);
return commands.set(key, uuid, "NX", "PX", expire);
}
});
return !StringUtils.isEmpty(result);
} catch (Exception e) {
logger.error("set redis occured an exception", e);
}
return false;
}
@Override
public boolean releaseLock(String key) {
// 釋放鎖的時候,有可能因為持鎖之後方法執行時間大於鎖的有效期,此時有可能已經被另外一個線程持有鎖,所以不能直接刪除
try {
List<string> keys = new ArrayList<string>();
keys.add(key);
List<string> args = new ArrayList<string>();
args.add(lockFlag.get());
// 使用lua腳本刪除redis中匹配value的key,可以避免由於方法執行時間過長而redis鎖自動過期失效的時候誤刪其他線程的鎖
// spring自帶的執行腳本方法中,集群模式直接拋出不支持執行腳本的異常,所以只能拿到原redis的connection來執行腳本

Long result = redisTemplate.execute(new RedisCallback<long>() {
public Long doInRedis(RedisConnection connection) throws DataAccessException {
Object nativeConnection = connection.getNativeConnection();
// 集群模式和單機模式雖然執行腳本的方法一樣,但是沒有共同的接口,所以只能分開執行
// 集群模式
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
// 單機模式
else if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
return 0L;
}
});
return result != null && result > 0;
} catch (Exception e) {
logger.error("release lock occured an exception", e);
}
return false;
}
}
/<long>/<string>/<string>/<string>/<string>/<string>/<object>/<string>/<string>/<object>

二. 基於 AOP 的 Redis 分佈式鎖

在實際的使用過程中,分佈式鎖可以封裝好後使用在方法級別,這樣就不用每個地方都去獲取鎖和釋放鎖,使用起來更加方便。

首先定義個註解:

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author fuwei.deng

* @date 2017年6月14日 下午3:10:36
* @version 1.0.0
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {
/** 鎖的資源,redis的key*/
String value() default "default";
/** 持鎖時間,單位毫秒*/
long keepMills() default 30000;
/** 當獲取失敗時候動作*/
LockFailAction action() default LockFailAction.CONTINUE;
public enum LockFailAction{
/** 放棄 */
GIVEUP,
/** 繼續 */
CONTINUE;
}
/** 重試的間隔時間,設置GIVEUP忽略此項*/
long sleepMills() default 200;
/** 重試次數*/
int retryTimes() default 5;
}

裝配分佈式鎖的bean

import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import com.itopener.lock.redis.spring.boot.autoconfigure.lock.DistributedLock;
import com.itopener.lock.redis.spring.boot.autoconfigure.lock.RedisDistributedLock;
/**
* @author fuwei.deng
* @date 2017年6月14日 下午3:11:31
* @version 1.0.0
*/
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class DistributedLockAutoConfiguration {

@Bean
@ConditionalOnBean(RedisTemplate.class)
public DistributedLock redisDistributedLock(RedisTemplate<object> redisTemplate){
return new RedisDistributedLock(redisTemplate);
}
}
/<object>

定義切面(spring boot配置方式)

import java.lang.reflect.Method;
import java.util.Arrays;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock;
import com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock.LockFailAction;
import com.itopener.lock.redis.spring.boot.autoconfigure.lock.DistributedLock;
/**
* @author fuwei.deng
* @date 2017年6月14日 下午3:11:22
* @version 1.0.0
*/
@Aspect
@Configuration
@ConditionalOnClass(DistributedLock.class)
@AutoConfigureAfter(DistributedLockAutoConfiguration.class)
public class DistributedLockAspectConfiguration {
private final Logger logger = LoggerFactory.getLogger(DistributedLockAspectConfiguration.class);
@Autowired
private DistributedLock distributedLock;
@Pointcut("@annotation(com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock)")
private void lockPoint(){
}
@Around("lockPoint()")
public Object around(ProceedingJoinPoint pjp) throws Throwable{
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
RedisLock redisLock = method.getAnnotation(RedisLock.class);
String key = redisLock.value();
if(StringUtils.isEmpty(key)){

Object[] args = pjp.getArgs();
key = Arrays.toString(args);
}
int retryTimes = redisLock.action().equals(LockFailAction.CONTINUE) ? redisLock.retryTimes() : 0;
boolean lock = distributedLock.lock(key, redisLock.keepMills(), retryTimes, redisLock.sleepMills());
if(!lock) {
logger.debug("get lock failed : " + key);
return null;
}
//得到鎖,執行方法,釋放鎖
logger.debug("get lock success : " + key);
try {
return pjp.proceed();
} catch (Exception e) {
logger.error("execute locked method occured an exception", e);
} finally {
boolean releaseResult = distributedLock.releaseLock(key);
logger.debug("release lock : " + key + (releaseResult ? " success" : " failed"));
}
return null;
}
}

spring boot starter還需要在 resources/META-INF 中添加 spring.factories 文件

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.itopener.lock.redis.spring.boot.autoconfigure.DistributedLockAutoConfiguration,\
com.itopener.lock.redis.spring.boot.autoconfigure.DistributedLockAspectConfiguration

這樣封裝之後,使用spring boot開發的項目,直接依賴這個starter,就可以在方法上加 RedisLock 註解來實現分佈式鎖的功能了,當然如果需要自己控制,直接注入分佈式鎖的bean即可

@Autowired
private DistributedLock distributedLock;

如果需要使用其他的分佈式鎖實現,繼承 AbstractDistributedLock 後實現獲取鎖和釋放鎖的方法即可

最後,小編在金三銀四跳槽季,給大家分享一波書籍福利,希望對大家有所幫助。

Springboot-Redis分佈式鎖

轉發+關注。私信“書籍”即可獲取!感謝大家支持,你們的支持是我更新最大的動力!


分享到:


相關文章: