引言
最近項目上線的頻率頗高,連著幾天加班熬夜,身體有點吃不消精神也有些萎靡,無奈業務方催的緊,工期就在眼前只能硬著頭皮上了。腦子渾渾噩噩的時候,寫的就不能叫代碼,可以直接叫做Bug。我就熬夜寫了一個bug被罵慘了。
由於是做商城業務,要頻繁的對商品庫存進行扣減,應用是集群部署,為避免併發造成庫存超買超賣等問題,採用 redis 分佈式鎖加以控制。本以為給扣庫存的代碼加上鎖lock.tryLock就萬事大吉了
<code>1
/**
2
*
@author
xiaofu
3
*
@description
扣減庫存
4
*
@date
2020
/4/21
12
:10
5
*/
6
public
String
stockLock()
{
7
RLock
lock
=
redissonClient.getLock("stockLock");
8
try
{
9
/**
10
*
獲取鎖
11
*/
12
if
(lock.tryLock(10,
TimeUnit.SECONDS))
{
13
/**
14
*
查詢庫存數
15
*/
16
Integer
stock
=
Integer.valueOf(stringRedisTemplate.opsForValue().get("stockCount"));
17
/**
18
*
扣減庫存
19
*/
20
if
(stock
>
0
)
{
21
stock
=
stock
-
1
;
22
stringRedisTemplate.opsForValue().set("stockCount",
stock.toString());
23
LOGGER.info("庫存扣減成功,剩餘庫存數量:{}",
stock);
24
}
else
{
25
LOGGER.info("庫存不足~");
26
}
27
}
else
{
28
LOGGER.info("未獲取到鎖業務結束..");
29
}
30
}
catch
(Exception
e)
{
31
LOGGER.info("處理異常",
e);
32
}
finally
{
33
lock.unlock();
34
}
35
return
"ok"
;
36
}
/<code>
結果業務代碼執行完以後我忘了釋放鎖lock.unlock(),導致redis線程池被打滿,redis服務大面積故障,造成庫存數據扣減混亂,被領導一頓臭罵,這個月績效~ 哎·~。
隨著 使用redis 鎖的時間越長,我發現 redis 鎖的坑遠比想象中要多。就算在面試題當中redis分佈式鎖的出鏡率也比較高,比如:“用鎖遇到過哪些問題?” ,“又是如何解決的?” 基本都是一套連招問出來的。
今天就分享一下我用redis 分佈式鎖的踩坑日記,以及一些解決方案,和大家一起共勉。
一、鎖未被釋放
這種情況是一種低級錯誤,就是我上邊犯的錯,由於當前線程 獲取到redis 鎖,處理完業務後未及時釋放鎖,導致其它線程會一直嘗試獲取鎖阻塞,例如:用Jedis客戶端會報如下的錯誤信息
<code>1redis
.clients
.jedis
.exceptions
.JedisConnectionException
:Could
not
get
a
resource
from
the
pool
/<code>
redis線程池已經沒有空閒線程來處理客戶端命令。
解決的方法也很簡單,只要我們細心一點,拿到鎖的線程處理完業務及時釋放鎖,如果是重入鎖未拿到鎖後,線程可以釋放當前連接並且sleep一段時間。
<code>1
public
void
lock()
{
2
while
(true)
{
3
boolean
flag
=
this.getLock(key);
4
if
(flag)
{
5
TODO
.........
6
}
else
{
7
//
釋放當前redis連接
8
redis.close();
9
//
休眠1000毫秒
10
sleep(1000);
11
}
12
}
13
}
/<code>
二、B的鎖被A給釋放了
我們知道Redis實現鎖的原理在於 SETNX命令。當 key不存在時將 key的值設為 value ,返回值為 1;若給定的 key已經存在,則 SETNX不做任何動作,返回值為 0 。
<code>1
SETNX keyvalue
/<code>
我們來設想一下這個場景:A、B兩個線程來嘗試給key myLock加鎖,A線程先拿到鎖(假如鎖3秒後過期),B線程就在等待嘗試獲取鎖,到這一點毛病沒有。
那如果此時業務邏輯比較耗時,執行時間已經超過redis鎖過期時間,這時A線程的鎖自動釋放(刪除key),B線程檢測到myLock這個key不存在,執行 SETNX命令也拿到了鎖。
但是,此時A線程執行完業務邏輯之後,還是會去釋放鎖(刪除key),這就導致B線程的鎖被A線程給釋放了。
為避免上邊的情況,一般我們在每個線程加鎖時要帶上自己獨有的value值來標識,只釋放指定value的key,否則就會出現釋放鎖混亂的場景。
三、數據庫事務超時
emm~ 聊redis鎖咋還扯到數據庫事務上來了?彆著急往下看,看下邊這段代碼:
<code>1
@Transaction
2
public
void
lock()
{
3
4
while
(true)
{
5
boolean
flag
=
this.getLock(key);
6
if
(flag)
{
7
insert();
8
}
9
}
10
}
/<code>
給這個方法添加一個@Transaction註解開啟事務,如代碼中拋出異常進行回滾,要知道數據庫事務可是有超時時間限制的,並不會無條件的一直等一個耗時的數據庫操作。
比如:我們解析一個大文件,再將數據存入到數據庫,如果執行時間太長,就會導致事務超時自動回滾。
一旦你的key長時間獲取不到鎖,獲取鎖等待的時間遠超過數據庫事務超時時間,程序就會報異常。
一般為解決這種問題,我們就需要將數據庫事務改為手動提交、回滾事務。
<code>1
@Autowired
2
DataSourceTransactionManager
dataSourceTransactionManager;
3
4
@Transaction
5
public
void
lock()
{
6
//手動開啟事務
7
TransactionStatus
transactionStatus
=
dataSourceTransactionManager.getTransaction(transactionDefinition);
8
try
{
9
while
(true)
{
10
boolean
flag
=
this.getLock(key);
11
if
(flag)
{
12
insert();
13
//手動提交事務
14
dataSourceTransactionManager.commit(transactionStatus);
15
}
16
}
17
}
catch
(Exception
e)
{
18
//手動回滾事務
19
dataSourceTransactionManager.rollback(transactionStatus);
20
}
21
}
/<code>
四、鎖過期了,業務還沒執行完
這種情況和我們上邊提到的第二種比較類似,但解決思路上略有不同。
同樣是redis分佈式鎖過期,而業務邏輯沒執行完的場景,不過,這裡換一種思路想問題,把redis鎖的過期時間再弄長點不就解決了嗎?
那還是有問題,我們可以在加鎖的時候,手動調長redis鎖的過期時間,可這個時間多長合適?業務邏輯的執行時間是不可控的,調的過長又會影響操作性能。
要是redis鎖的過期時間能夠自動續期就好了。
為了解決這個問題我們使用redis客戶端redisson,redisson很好的解決了redis在分佈式環境下的一些棘手問題,它的宗旨就是讓使用者減少對Redis的關注,將更多精力用在處理業務邏輯上。
redisson對分佈式鎖做了很好封裝,只需調用API即可。
<code>1 RLocklock
= redissonClient.getLock("stockLock"
);/<code>
redisson在加鎖成功後,會註冊一個定時任務監聽這個鎖,每隔10秒就去查看這個鎖,如果還持有鎖,就對過期時間進行續期。默認過期時間30秒。這個機制也被叫做:“看門狗”,這名字。。。
舉例子:假如加鎖的時間是30秒,過10秒檢查一次,一旦加鎖的業務沒有執行完,就會進行一次續期,把鎖的過期時間再次重置成30秒。
通過分析下邊redisson的源碼實現可以發現,不管是加鎖、解鎖、續約都是客戶端把一些複雜的業務邏輯,通過封裝在Lua腳本中發送給redis,保證這段複雜業務邏輯執行的原子性。
<code>1
@Slf4j
2
@Service
3public
class
RedisDistributionLockPlus
{
4
5
/**
6
*
加鎖超時時間,單位毫秒,
即:加鎖時間內執行完操作,如果未完成會有並發現象
7
*/
8
private
static
final
long
DEFAULT_LOCK_TIMEOUT
=
30
;
9
10
private
static
final
long
TIME_SECONDS_FIVE
=
5
;
11
12
/**
13
*
每個key的過期時間
{@link
LockContent}
14
*/
15
private
Map
LockContent>
lockContentMap
=
new
ConcurrentHashMap<>(512);
16
17
/**
18
*
redis執行成功的返回
19
*/
20
private
static
final
Long
EXEC_SUCCESS
=
1L;
21
22
/**
23
*
獲取鎖lua腳本,
k1:獲鎖key,
k2:續約耗時key,
arg1:requestId,arg2:超時時間
24
*/
25
private
static
final
String
LOCK_SCRIPT
=
"if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end "
+
26
"if redis.call('exists', KEYS[1]) == 0 then "
+
27
"local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) "
+
28
"for k, v in pairs(t) do "
+
29
"if v == 'OK' then return tonumber(ARGV[2]) end "
+
30
"end "
+
31
"return 0 end"
;
32
33
/**
34
*
釋放鎖lua腳本,
k1:獲鎖key,
k2:續約耗時key,
arg1:requestId,arg2:業務耗時
arg3:
業務開始設置的timeout
35
*/
36
private
static
final
String
UNLOCK_SCRIPT
=
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+
37
"local ctime = tonumber(ARGV[2]) "
+
38
"local biz_timeout = tonumber(ARGV[3]) "
+
39
"if ctime > 0 then "
+
40
"if redis.call('exists', KEYS[2]) == 1 then "
+
41
"local avg_time = redis.call('get', KEYS[2]) "
+
42
"avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 "
+
43
"if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) "
+
44
"else redis.call('del', KEYS[2]) end "
+
45
"elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end "
+
46
"end "
+
47
"return redis.call('del', KEYS[1]) "
+
48
"else return 0 end"
;
49
/**
50
*
續約lua腳本
51
*/
52
private
static
final
String
RENEW_SCRIPT
=
"if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end"
;
53
54
55
private
final
StringRedisTemplate
redisTemplate;
56
57
public
RedisDistributionLockPlus(StringRedisTemplate
redisTemplate)
{
58
this.redisTemplate
=
redisTemplate;
59
ScheduleTask
task
=
new
ScheduleTask(this,
lockContentMap);
60
//
啟動定時任務
61
ScheduleExecutor.schedule(task,
1
,
1
,
TimeUnit.SECONDS);
62
}
63
64
/**
65
*
加鎖
66
*
取到鎖加鎖,取不到鎖一直等待知道獲得鎖
67
*
68
*
@param
lockKey
69
*
@param
requestId
全局唯一
70
*
@param
expire
鎖過期時間,
單位秒
71
*
@return
72
*/
73
public
boolean
lock(String
lockKey,
String
requestId,
long
expire)
{
74
log.info("開始執行加鎖,
lockKey
={},
requestId={}",
lockKey,
requestId);
75
for
(;
;
)
{
76
//
判斷是否已經有線程持有鎖,減少redis的壓力
77
LockContent
lockContentOld
=
lockContentMap.get(lockKey);
78
boolean
unLocked
=
null
==
lockContentOld;
79
//
如果沒有被鎖,就獲取鎖
80
if
(unLocked)
{
81
long
startTime
=
System.currentTimeMillis();
82
//
計算超時時間
83
long
bizExpire
=
expire
==
0L
?
DEFAULT_LOCK_TIMEOUT
:
expire;
84
String
lockKeyRenew
=
lockKey
+
"_renew"
;
85
86
RedisScript
script
=
RedisScript.of(LOCK_SCRIPT,
Long.class);
87
List
keys
=
new
ArrayList<>();
88
keys.add(lockKey);
89
keys.add(lockKeyRenew);
90
Long
lockExpire
=
redisTemplate.execute(script,
keys,
requestId,
Long.toString(bizExpire));
91
if
(null
!=
lockExpire
&&
lockExpire
>
0
)
{
92
//
將鎖放入map
93
LockContent
lockContent
=
new
LockContent();
94
lockContent.setStartTime(startTime);
95
lockContent.setLockExpire(lockExpire);
96
lockContent.setExpireTime(startTime
+
lockExpire
*
1000
);
97
lockContent.setRequestId(requestId);
98
lockContent.setThread(Thread.currentThread());
99
lockContent.setBizExpire(bizExpire);
100
lockContent.setLockCount(1);
101
lockContentMap.put(lockKey,
lockContent);
102
log.info("加鎖成功,
lockKey
={},
requestId={}",
lockKey,
requestId);
103
return
true
;
104
}
105
}
106
//
重複獲取鎖,在線程池中由於線程複用,線程相等並不能確定是該線程的鎖
107
if
(Thread.currentThread()
==
lockContentOld.getThread()
108
&&
requestId.equals(lockContentOld.getRequestId())){
109
//
計數
+1
110
lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
111
return
true
;
112
}
113
114
//
如果被鎖或獲取鎖失敗,則等待100毫秒
115
try
{
116
TimeUnit.MILLISECONDS.sleep(100);
117
}
catch
(InterruptedException
e)
{
118
//
這裡用lombok
有問題
119
log.error("獲取redis
鎖失敗,
lockKey
={},
requestId={}",
lockKey,
requestId,
e);
120
return
false
;
121
}
122
}
123
}
124
125
126
/**
127
*
解鎖
128
*
129
*
@param
lockKey
130
*
@param
lockValue
131
*/
132
public
boolean
unlock(String
lockKey,
String
lockValue)
{
133
String
lockKeyRenew
=
lockKey
+
"_renew"
;
134
LockContent
lockContent
=
lockContentMap.get(lockKey);
135
136
long
consumeTime;
137
if
(null
==
lockContent)
{
138
consumeTime
=
0L;
139
}
else
if
(lockValue.equals(lockContent.getRequestId()))
{
140
int
lockCount
=
lockContent.getLockCount();
141
//
每次釋放鎖,
計數
-1
,減到0時刪除redis上的key
142
if
(--lockCount
>
0
)
{
143
lockContent.setLockCount(lockCount);
144
return
false
;
145
}
146
consumeTime
=
(System.currentTimeMillis()
-
lockContent.getStartTime())
/
1000
;
147
}
else
{
148
log.info("釋放鎖失敗,不是自己的鎖。");
149
return
false
;
150
}
151
152
//
刪除已完成key,先刪除本地緩存,減少redis壓力,
分佈式鎖,只有一個,所以這裡不加鎖
153
lockContentMap.remove(lockKey);
154
155
RedisScript
script
=
RedisScript.of(UNLOCK_SCRIPT,
Long.class);
156
List
keys
=
new
ArrayList<>();
157
keys.add(lockKey);
158
keys.add(lockKeyRenew);
159
160
Long
result
=
redisTemplate.execute(script,
keys,
lockValue,
Long.toString(consumeTime),
161
Long.toString(lockContent.getBizExpire()));
162
return
EXEC_SUCCESS.equals(result);
163
164
}
165
166
/**
167
*
續約
168
*
169
*
@param
lockKey
170
*
@param
lockContent
171
*
@return
true
:續約成功,false:續約失敗(1、續約期間執行完成,鎖被釋放
2
、不是自己的鎖,3、續約期間鎖過期了(未解決))
172
*/
173
public
boolean
renew(String
lockKey,
LockContent
lockContent)
{
174
175
//
檢測執行業務線程的狀態
176
Thread.State
state
=
lockContent.getThread().getState();
177
if
(Thread.State.TERMINATED
==
state)
{
178
log.info("執行業務的線程已終止,不再續約
lockKey
={},
lockContent={}",
lockKey,
lockContent);
179
return
false
;
180
}
181
182
String
requestId
=
lockContent.getRequestId();
183
long
timeOut
=
(lockContent.getExpireTime()
-
lockContent.getStartTime())
/
1000
;
184
185
RedisScript
script
=
RedisScript.of(RENEW_SCRIPT,
Long.class);
186
List
keys
=
new
ArrayList<>();
187
keys.add(lockKey);
188
189
Long
result
=
redisTemplate.execute(script,
keys,
requestId,
Long.toString(timeOut));
190
log.info("續約結果,True成功,False失敗
lockKey
={},
result={}",
lockKey,
EXEC_SUCCESS.equals(result));
191
return
EXEC_SUCCESS.equals(result);
192
}
193
194
195
static
class
ScheduleExecutor
{
196
197
public
static
void
schedule(ScheduleTask
task,
long
initialDelay,
long
period,
TimeUnit
unit)
{
198
long
delay
=
unit.toMillis(initialDelay);
199
long
period_
=
unit.toMillis(period);
200
//
定時執行
201
new
Timer("Lock-Renew-Task").schedule(task,
delay,
period_);
202
}
203
}
204
205
static
class
ScheduleTask
extends
TimerTask
{
206
207
private
final
RedisDistributionLockPlus
redisDistributionLock;
208
private
final
Map
LockContent>
lockContentMap;
209
210
public
ScheduleTask(RedisDistributionLockPlus
redisDistributionLock,
Map
LockContent>
lockContentMap)
{
211
this.redisDistributionLock
=
redisDistributionLock;
212
this.lockContentMap
=
lockContentMap;
213
}
214
215
@Override
216
public
void
run()
{
217
if
(lockContentMap.isEmpty())
{
218
return;
219
}
220
Set
LockContent>>
entries
=
lockContentMap.entrySet();
221
for
(Map.Entry
LockContent>
entry
:
entries)
{
222
String
lockKey
=
entry.getKey();
223
LockContent
lockContent
=
entry.getValue();
224
long
expireTime
=
lockContent.getExpireTime();
225
//
減少線程池中任務數量
226
if
((expireTime
-
System.currentTimeMillis())/
1000
<
TIME_SECONDS_FIVE)
{
227
//線程池異步續約
228
ThreadPool.submit(()
->
{
229
boolean
renew
=
redisDistributionLock.renew(lockKey,
lockContent);
230
if
(renew)
{
231
long
expireTimeNew
=
lockContent.getStartTime()
+
(expireTime
-
lockContent.getStartTime())
*
2
-
TIME_SECONDS_FIVE
*
1000
;
232
lockContent.setExpireTime(expireTimeNew);
233
}
else
{
234
//
續約失敗,說明已經執行完
OR
redis
出現問題
235
lockContentMap.remove(lockKey);
236
}
237
});
238
}
239
}
240
}
241
}
242
}
/<code>
五、redis主從複製的坑
redis高可用最常見的方案就是主從複製(master-slave),這種模式也給redis分佈式鎖挖了一坑。
redis cluster集群環境下,假如現在A客戶端想要加鎖,它會根據路由規則選擇一臺master節點寫入key mylock,在加鎖成功後,master節點會把key異步複製給對應的slave節點。
如果此時redis master節點宕機,為保證集群可用性,會進行主備切換,slave變為了redis master。B客戶端在新的master節點上加鎖成功,而A客戶端也以為自己還是成功加了鎖的。
此時就會導致同一時間內多個客戶端對一個分佈式鎖完成了加鎖,導致各種髒數據的產生。
至於解決辦法嘛,目前看還沒有什麼根治的方法,只能儘量保證機器的穩定性,減少發生此事件的概率。
總結
上面就是我在使用Redis 分佈式鎖時遇到的一些坑,有點小感慨,經常用一個方法填上這個坑,沒多久就發現另一個坑又出來了,其實根本沒有什麼十全十美的解決方案,哪有什麼銀彈,只不過是在權衡利弊後,選一個在接受範圍內的折中方案而已。
感悟
從正式成為一名程序員的那天起,註定要進行沒有止境的學習,想要進階高級或者專家,就要堅持每天都高效的學習,不要給自己的懶惰找借,這次我給你整理好了,我看你還有啥理由!
私信回覆【666】送你