redis 分佈式鎖的 5個坑,真是又大又深

引言

最近項目上線的頻率頗高,連著幾天加班熬夜,身體有點吃不消精神也有些萎靡,無奈業務方催的緊,工期就在眼前只能硬著頭皮上了。腦子渾渾噩噩的時候,寫的就不能叫代碼,可以直接叫做Bug。我就熬夜寫了一個bug被罵慘了。

由於是做商城業務,要頻繁的對商品庫存進行扣減,應用是集群部署,為避免併發造成庫存超買超賣等問題,採用 redis 分佈式鎖加以控制。本以為給扣庫存的代碼加上鎖lock.tryLock就萬事大吉了

<code> 

1

    

/**

2

     

*

 

@author

 

xiaofu

3

     

*

 

@description

 

扣減庫存

4

     

*

 

@date

 

2020

/4/21

 

12

:10

5

     

*/

6

   

public

 

String

 

stockLock()

 

{

7

        

RLock

 

lock

 

=

 

redissonClient.getLock("stockLock");

8

        

try

 

{

9

            

/**

10

             

*

 

獲取鎖

11

             

*/

12

            

if

 

(lock.tryLock(10,

 

TimeUnit.SECONDS))

 

{

13

                

/**

14

                 

*

 

查詢庫存數

15

                 

*/

16

                

Integer

 

stock

 

=

 

Integer.valueOf(stringRedisTemplate.opsForValue().get("stockCount"));

17

                

/**

18

                 

*

 

扣減庫存

19

                 

*/

20

                

if

 

(stock

 

>

 

0

)

 

{

21

                    

stock

 

=

 

stock

 

-

 

1

;

22

                    

stringRedisTemplate.opsForValue().set("stockCount",

 

stock.toString());

23

                    

LOGGER.info("庫存扣減成功,剩餘庫存數量:{}",

 

stock);

24

                

}

 

else

 

{

25

                    

LOGGER.info("庫存不足~");

26

                

}

27

            

}

 

else

 

{

28

                

LOGGER.info("未獲取到鎖業務結束..");

29

            

}

30

        

}

 

catch

 

(Exception

 

e)

 

{

31

            

LOGGER.info("處理異常",

 

e);

32

        

}

 

finally

 

{

33

            

lock.unlock();

34

        

}

35

        

return

 

"ok"

;

36

  

}

/<code>

結果業務代碼執行完以後我忘了釋放鎖lock.unlock(),導致redis線程池被打滿,redis服務大面積故障,造成庫存數據扣減混亂,被領導一頓臭罵,這個月績效~ 哎·~。

隨著 使用redis 鎖的時間越長,我發現 redis 鎖的坑遠比想象中要多。就算在面試題當中redis分佈式鎖的出鏡率也比較高,比如:“用鎖遇到過哪些問題?” ,“又是如何解決的?” 基本都是一套連招問出來的。

今天就分享一下我用redis 分佈式鎖的踩坑日記,以及一些解決方案,和大家一起共勉。

一、鎖未被釋放

這種情況是一種低級錯誤,就是我上邊犯的錯,由於當前線程 獲取到redis 鎖,處理完業務後未及時釋放鎖,導致其它線程會一直嘗試獲取鎖阻塞,例如:用Jedis客戶端會報如下的錯誤信息

<code>1

redis

.clients

.jedis

.exceptions

.JedisConnectionException

Could

 

not

 

get

 

a

 

resource

 

from

 

the

 

pool

/<code>

redis線程池已經沒有空閒線程來處理客戶端命令。

解決的方法也很簡單,只要我們細心一點,拿到鎖的線程處理完業務及時釋放鎖,如果是重入鎖未拿到鎖後,線程可以釋放當前連接並且sleep一段時間。

<code> 

1

  

public

 

void

 

lock()

 

{

2

      

while

 

(true)

 

{

3

          

boolean

 

flag

 

=

 

this.getLock(key);

4

          

if

 

(flag)

 

{

5

                

TODO

 

.........

6

          

}

 

else

 

{

7

                

//

 

釋放當前redis連接

8

                

redis.close();

9

                

//

 

休眠1000毫秒

10

                

sleep(1000);

11

          

}

12

        

}

13

    

}

/<code>

二、B的鎖被A給釋放了

我們知道Redis實現鎖的原理在於 SETNX命令。當 key不存在時將 key的值設為 value ,返回值為 1;若給定的 key已經存在,則 SETNX不做任何動作,返回值為 0 。

<code>

1

SETNX key 

value

/<code>

我們來設想一下這個場景:A、B兩個線程來嘗試給key myLock加鎖,A線程先拿到鎖(假如鎖3秒後過期),B線程就在等待嘗試獲取鎖,到這一點毛病沒有。

那如果此時業務邏輯比較耗時,執行時間已經超過redis鎖過期時間,這時A線程的鎖自動釋放(刪除key),B線程檢測到myLock這個key不存在,執行 SETNX命令也拿到了鎖。

但是,此時A線程執行完業務邏輯之後,還是會去釋放鎖(刪除key),這就導致B線程的鎖被A線程給釋放了。

為避免上邊的情況,一般我們在每個線程加鎖時要帶上自己獨有的value值來標識,只釋放指定value的key,否則就會出現釋放鎖混亂的場景。

三、數據庫事務超時

emm~ 聊redis鎖咋還扯到數據庫事務上來了?彆著急往下看,看下邊這段代碼:

<code> 

1

   

@Transaction

2

   

public

 

void

 

lock()

 

{

3

4

        

while

 

(true)

 

{

5

            

boolean

 

flag

 

=

 

this.getLock(key);

6

            

if

 

(flag)

 

{

7

                

insert();

8

            

}

9

        

}

10

    

}

/<code>

給這個方法添加一個@Transaction註解開啟事務,如代碼中拋出異常進行回滾,要知道數據庫事務可是有超時時間限制的,並不會無條件的一直等一個耗時的數據庫操作。

比如:我們解析一個大文件,再將數據存入到數據庫,如果執行時間太長,就會導致事務超時自動回滾。

一旦你的key長時間獲取不到鎖,獲取鎖等待的時間遠超過數據庫事務超時時間,程序就會報異常。

一般為解決這種問題,我們就需要將數據庫事務改為手動提交、回滾事務。

<code> 

1

    

@Autowired

2

    

DataSourceTransactionManager

 

dataSourceTransactionManager;

3

4

    

@Transaction

5

    

public

 

void

 

lock()

 

{

6

        

//手動開啟事務

7

        

TransactionStatus

 

transactionStatus

 

=

 

dataSourceTransactionManager.getTransaction(transactionDefinition);

8

        

try

 

{

9

            

while

 

(true)

 

{

10

                

boolean

 

flag

 

=

 

this.getLock(key);

11

                

if

 

(flag)

 

{

12

                    

insert();

13

                    

//手動提交事務

14

                    

dataSourceTransactionManager.commit(transactionStatus);

15

                

}

16

            

}

17

        

}

 

catch

 

(Exception

 

e)

 

{

18

            

//手動回滾事務

19

            

dataSourceTransactionManager.rollback(transactionStatus);

20

        

}

21

    

}

/<code>

四、鎖過期了,業務還沒執行完

這種情況和我們上邊提到的第二種比較類似,但解決思路上略有不同。

同樣是redis分佈式鎖過期,而業務邏輯沒執行完的場景,不過,這裡換一種思路想問題,把redis鎖的過期時間再弄長點不就解決了嗎?

那還是有問題,我們可以在加鎖的時候,手動調長redis鎖的過期時間,可這個時間多長合適?業務邏輯的執行時間是不可控的,調的過長又會影響操作性能。

要是redis鎖的過期時間能夠自動續期就好了。

為了解決這個問題我們使用redis客戶端redisson,redisson很好的解決了redis在分佈式環境下的一些棘手問題,它的宗旨就是讓使用者減少對Redis的關注,將更多精力用在處理業務邏輯上。

redisson對分佈式鎖做了很好封裝,只需調用API即可。

<code>1  RLock 

lock

 = redissonClient.getLock(

"stockLock"

);/<code>

redisson在加鎖成功後,會註冊一個定時任務監聽這個鎖,每隔10秒就去查看這個鎖,如果還持有鎖,就對過期時間進行續期。默認過期時間30秒。這個機制也被叫做:“看門狗”,這名字。。。

舉例子:假如加鎖的時間是30秒,過10秒檢查一次,一旦加鎖的業務沒有執行完,就會進行一次續期,把鎖的過期時間再次重置成30秒。

通過分析下邊redisson的源碼實現可以發現,不管是加鎖、解鎖、續約都是客戶端把一些複雜的業務邏輯,通過封裝在Lua腳本中發送給redis,保證這段複雜業務邏輯執行的原子性。

<code>  

1

@Slf4j

2

@Service

3public

 

class

 

RedisDistributionLockPlus

 

{

4

5

    

/**

6

     

*

 

加鎖超時時間,單位毫秒,

 

即:加鎖時間內執行完操作,如果未完成會有並發現象

7

     

*/

8

    

private

 

static

 

final

 

long

 

DEFAULT_LOCK_TIMEOUT

 

=

 

30

;

9

10

    

private

 

static

 

final

 

long

 

TIME_SECONDS_FIVE

 

=

 

5

 

;

11

12

    

/**

13

     

*

 

每個key的過期時間

 

{@link

 

LockContent}

14

     

*/

15

    

private

 

Map 

LockContent>

 

lockContentMap

 

=

 

new

 

ConcurrentHashMap<>(512);

16

17

    

/**

18

     

*

 

redis執行成功的返回

19

     

*/

20

    

private

 

static

 

final

 

Long

 

EXEC_SUCCESS

 

=

 

1L;

21

22

    

/**

23

     

*

 

獲取鎖lua腳本,

 

k1:獲鎖key,

 

k2:續約耗時key,

 

arg1:requestId,arg2:超時時間

24

     

*/

25

    

private

 

static

 

final

 

String

 

LOCK_SCRIPT

 

=

 

"if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end "

 

+

26

            

"if redis.call('exists', KEYS[1]) == 0 then "

 

+

27

               

"local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) "

 

+

28

               

"for k, v in pairs(t) do "

 

+

29

                 

"if v == 'OK' then return tonumber(ARGV[2]) end "

 

+

30

               

"end "

 

+

31

            

"return 0 end"

;

32

33

    

/**

34

     

*

 

釋放鎖lua腳本,

 

k1:獲鎖key,

 

k2:續約耗時key,

 

arg1:requestId,arg2:業務耗時

 

arg3:

 

業務開始設置的timeout

35

     

*/

36

    

private

 

static

 

final

 

String

 

UNLOCK_SCRIPT

 

=

 

"if redis.call('get', KEYS[1]) == ARGV[1] then "

 

+

37

            

"local ctime = tonumber(ARGV[2]) "

 

+

38

            

"local biz_timeout = tonumber(ARGV[3]) "

 

+

39

            

"if ctime > 0 then  "

 

+

40

               

"if redis.call('exists', KEYS[2]) == 1 then "

 

+

41

                   

"local avg_time = redis.call('get', KEYS[2]) "

 

+

42

                   

"avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 "

 

+

43

                   

"if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) "

 

+

44

                   

"else redis.call('del', KEYS[2]) end "

 

+

45

               

"elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end "

 

+

46

            

"end "

 

+

47

            

"return redis.call('del', KEYS[1]) "

 

+

48

            

"else return 0 end"

;

49

    

/**

50

     

*

 

續約lua腳本

51

     

*/

52

    

private

 

static

 

final

 

String

 

RENEW_SCRIPT

 

=

 

"if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end"

;

53

54

55

    

private

 

final

 

StringRedisTemplate

 

redisTemplate;

56

57

    

public

 

RedisDistributionLockPlus(StringRedisTemplate

 

redisTemplate)

 

{

58

        

this.redisTemplate

 

=

 

redisTemplate;

59

        

ScheduleTask

 

task

 

=

 

new

 

ScheduleTask(this,

 

lockContentMap);

60

        

//

 

啟動定時任務

61

        

ScheduleExecutor.schedule(task,

 

1

,

 

1

,

 

TimeUnit.SECONDS);

62

    

}

63

64

    

/**

65

     

*

 

加鎖

66

     

*

 

取到鎖加鎖,取不到鎖一直等待知道獲得鎖

67

     

*

68

     

*

 

@param

 

lockKey

69

     

*

 

@param

 

requestId

 

全局唯一

70

     

*

 

@param

 

expire

   

鎖過期時間,

 

單位秒

71

     

*

 

@return

72

     

*/

73

    

public

 

boolean

 

lock(String

 

lockKey,

 

String

 

requestId,

 

long

 

expire)

 

{

74

        

log.info("開始執行加鎖,

 

lockKey

 

={},

 

requestId={}",

 

lockKey,

 

requestId);

75

        

for

 

(;

 

;

 

)

 

{

76

            

//

 

判斷是否已經有線程持有鎖,減少redis的壓力

77

            

LockContent

 

lockContentOld

 

=

 

lockContentMap.get(lockKey);

78

            

boolean

 

unLocked

 

=

 

null

 

==

 

lockContentOld;

79

            

//

 

如果沒有被鎖,就獲取鎖

80

            

if

 

(unLocked)

 

{

81

                

long

 

startTime

 

=

 

System.currentTimeMillis();

82

                

//

 

計算超時時間

83

                

long

 

bizExpire

 

=

 

expire

 

==

 

0L

 

?

 

DEFAULT_LOCK_TIMEOUT

 

:

 

expire;

84

                

String

 

lockKeyRenew

 

=

 

lockKey

 

+

 

"_renew"

;

85

86

                

RedisScript

 

script

 

=

 

RedisScript.of(LOCK_SCRIPT,

 

Long.class);

87

                

List

 

keys

 

=

 

new

 

ArrayList<>();

88

                

keys.add(lockKey);

89

                

keys.add(lockKeyRenew);

90

                

Long

 

lockExpire

 

=

 

redisTemplate.execute(script,

 

keys,

 

requestId,

 

Long.toString(bizExpire));

91

                

if

 

(null

 

!=

 

lockExpire

 

&&

 

lockExpire

 

>

 

0

)

 

{

92

                    

//

 

將鎖放入map

93

                    

LockContent

 

lockContent

 

=

 

new

 

LockContent();

94

                    

lockContent.setStartTime(startTime);

95

                    

lockContent.setLockExpire(lockExpire);

96

                    

lockContent.setExpireTime(startTime

 

+

 

lockExpire

 

*

 

1000

);

97

                    

lockContent.setRequestId(requestId);

98

                    

lockContent.setThread(Thread.currentThread());

99

                    

lockContent.setBizExpire(bizExpire);

100

                    

lockContent.setLockCount(1);

101

                    

lockContentMap.put(lockKey,

 

lockContent);

102

                    

log.info("加鎖成功,

 

lockKey

 

={},

 

requestId={}",

 

lockKey,

 

requestId);

103

                    

return

 

true

;

104

                

}

105

            

}

106

            

//

 

重複獲取鎖,在線程池中由於線程複用,線程相等並不能確定是該線程的鎖

107

            

if

 

(Thread.currentThread()

 

==

 

lockContentOld.getThread()

108

                      

&&

 

requestId.equals(lockContentOld.getRequestId())){

109

                

//

 

計數

 

+1

110

                

lockContentOld.setLockCount(lockContentOld.getLockCount()+1);

111

                

return

 

true

;

112

            

}

113

114

            

//

 

如果被鎖或獲取鎖失敗,則等待100毫秒

115

            

try

 

{

116

                

TimeUnit.MILLISECONDS.sleep(100);

117

            

}

 

catch

 

(InterruptedException

 

e)

 

{

118

                

//

 

這裡用lombok

 

有問題

119

                

log.error("獲取redis

 

鎖失敗,

 

lockKey

 

={},

 

requestId={}",

 

lockKey,

 

requestId,

 

e);

120

                

return

 

false

;

121

            

}

122

        

}

123

    

}

124

125

126

    

/**

127

     

*

 

解鎖

128

     

*

129

     

*

 

@param

 

lockKey

130

     

*

 

@param

 

lockValue

131

     

*/

132

    

public

 

boolean

 

unlock(String

 

lockKey,

 

String

 

lockValue)

 

{

133

        

String

 

lockKeyRenew

 

=

 

lockKey

 

+

 

"_renew"

;

134

        

LockContent

 

lockContent

 

=

 

lockContentMap.get(lockKey);

135

136

        

long

 

consumeTime;

137

        

if

 

(null

 

==

 

lockContent)

 

{

138

            

consumeTime

 

=

 

0L;

139

        

}

 

else

 

if

 

(lockValue.equals(lockContent.getRequestId()))

 

{

140

            

int

 

lockCount

 

=

 

lockContent.getLockCount();

141

            

//

 

每次釋放鎖,

 

計數

 

-1

,減到0時刪除redis上的key

142

            

if

 

(--lockCount

 

>

 

0

)

 

{

143

                

lockContent.setLockCount(lockCount);

144

                

return

 

false

;

145

            

}

146

            

consumeTime

 

=

 

(System.currentTimeMillis()

 

-

 

lockContent.getStartTime())

 

/

 

1000

;

147

        

}

 

else

 

{

148

            

log.info("釋放鎖失敗,不是自己的鎖。");

149

            

return

 

false

;

150

        

}

151

152

        

//

 

刪除已完成key,先刪除本地緩存,減少redis壓力,

 

分佈式鎖,只有一個,所以這裡不加鎖

153

        

lockContentMap.remove(lockKey);

154

155

        

RedisScript

 

script

 

=

 

RedisScript.of(UNLOCK_SCRIPT,

 

Long.class);

156

        

List

 

keys

 

=

 

new

 

ArrayList<>();

157

        

keys.add(lockKey);

158

        

keys.add(lockKeyRenew);

159

160

        

Long

 

result

 

=

 

redisTemplate.execute(script,

 

keys,

 

lockValue,

 

Long.toString(consumeTime),

161

                

Long.toString(lockContent.getBizExpire()));

162

        

return

 

EXEC_SUCCESS.equals(result);

163

164

    

}

165

166

    

/**

167

     

*

 

續約

168

     

*

169

     

*

 

@param

 

lockKey

170

     

*

 

@param

 

lockContent

171

     

*

 

@return

 

true

:續約成功,false:續約失敗(1、續約期間執行完成,鎖被釋放

 

2

、不是自己的鎖,3、續約期間鎖過期了(未解決))

172

     

*/

173

    

public

 

boolean

 

renew(String

 

lockKey,

 

LockContent

 

lockContent)

 

{

174

175

        

//

 

檢測執行業務線程的狀態

176

        

Thread.State

 

state

 

=

 

lockContent.getThread().getState();

177

        

if

 

(Thread.State.TERMINATED

 

==

 

state)

 

{

178

            

log.info("執行業務的線程已終止,不再續約

 

lockKey

 

={},

 

lockContent={}",

 

lockKey,

 

lockContent);

179

            

return

 

false

;

180

        

}

181

182

        

String

 

requestId

 

=

 

lockContent.getRequestId();

183

        

long

 

timeOut

 

=

 

(lockContent.getExpireTime()

 

-

 

lockContent.getStartTime())

 

/

 

1000

;

184

185

        

RedisScript

 

script

 

=

 

RedisScript.of(RENEW_SCRIPT,

 

Long.class);

186

        

List

 

keys

 

=

 

new

 

ArrayList<>();

187

        

keys.add(lockKey);

188

189

        

Long

 

result

 

=

 

redisTemplate.execute(script,

 

keys,

 

requestId,

 

Long.toString(timeOut));

190

        

log.info("續約結果,True成功,False失敗

 

lockKey

 

={},

 

result={}",

 

lockKey,

 

EXEC_SUCCESS.equals(result));

191

        

return

 

EXEC_SUCCESS.equals(result);

192

    

}

193

194

195

    

static

 

class

 

ScheduleExecutor

 

{

196

197

        

public

 

static

 

void

 

schedule(ScheduleTask

 

task,

 

long

 

initialDelay,

 

long

 

period,

 

TimeUnit

 

unit)

 

{

198

            

long

 

delay

 

=

 

unit.toMillis(initialDelay);

199

            

long

 

period_

 

=

 

unit.toMillis(period);

200

            

//

 

定時執行

201

            

new

 

Timer("Lock-Renew-Task").schedule(task,

 

delay,

 

period_);

202

        

}

203

    

}

204

205

    

static

 

class

 

ScheduleTask

 

extends

 

TimerTask

 

{

206

207

        

private

 

final

 

RedisDistributionLockPlus

 

redisDistributionLock;

208

        

private

 

final

 

Map 

LockContent>

 

lockContentMap;

209

210

        

public

 

ScheduleTask(RedisDistributionLockPlus

 

redisDistributionLock,

 

Map 

LockContent>

 

lockContentMap)

 

{

211

            

this.redisDistributionLock

 

=

 

redisDistributionLock;

212

            

this.lockContentMap

 

=

 

lockContentMap;

213

        

}

214

215

        

@Override

216

        

public

 

void

 

run()

 

{

217

            

if

 

(lockContentMap.isEmpty())

 

{

218

                

return;

219

            

}

220

            

Set 

LockContent>>

 

entries

 

=

 

lockContentMap.entrySet();

221

            

for

 

(Map.Entry 

LockContent>

 

entry

 

:

 

entries)

 

{

222

                

String

 

lockKey

 

=

 

entry.getKey();

223

                

LockContent

 

lockContent

 

=

 

entry.getValue();

224

                

long

 

expireTime

 

=

 

lockContent.getExpireTime();

225

                

//

 

減少線程池中任務數量

226

                

if

 

((expireTime

 

-

 

System.currentTimeMillis())/

 

1000

 

<

 

TIME_SECONDS_FIVE)

 

{

227

                    

//線程池異步續約

228

                    

ThreadPool.submit(()

 

->

 

{

229

                        

boolean

 

renew

 

=

 

redisDistributionLock.renew(lockKey,

 

lockContent);

230

                        

if

 

(renew)

 

{

231

                            

long

 

expireTimeNew

 

=

 

lockContent.getStartTime()

 

+

 

(expireTime

 

-

 

lockContent.getStartTime())

 

*

 

2

 

-

 

TIME_SECONDS_FIVE

 

*

 

1000

;

232

                            

lockContent.setExpireTime(expireTimeNew);

233

                        

}

 

else

 

{

234

                            

//

 

續約失敗,說明已經執行完

 

OR

 

redis

 

出現問題

235

                            

lockContentMap.remove(lockKey);

236

                        

}

237

                    

});

238

                

}

239

            

}

240

        

}

241

    

}

242

}

/<code>

五、redis主從複製的坑

redis高可用最常見的方案就是主從複製(master-slave),這種模式也給redis分佈式鎖挖了一坑。

redis cluster集群環境下,假如現在A客戶端想要加鎖,它會根據路由規則選擇一臺master節點寫入key mylock,在加鎖成功後,master節點會把key異步複製給對應的slave節點。

如果此時redis master節點宕機,為保證集群可用性,會進行主備切換,slave變為了redis master。B客戶端在新的master節點上加鎖成功,而A客戶端也以為自己還是成功加了鎖的。

此時就會導致同一時間內多個客戶端對一個分佈式鎖完成了加鎖,導致各種髒數據的產生。

至於解決辦法嘛,目前看還沒有什麼根治的方法,只能儘量保證機器的穩定性,減少發生此事件的概率。

總結

上面就是我在使用Redis 分佈式鎖時遇到的一些坑,有點小感慨,經常用一個方法填上這個坑,沒多久就發現另一個坑又出來了,其實根本沒有什麼十全十美的解決方案,哪有什麼銀彈,只不過是在權衡利弊後,選一個在接受範圍內的折中方案而已。


感悟

從正式成為一名程序員的那天起,註定要進行沒有止境的學習,想要進階高級或者專家,就要堅持每天都高效的學習,不要給自己的懶惰找借,這次我給你整理好了,我看你還有啥理由!

私信回覆【666】送你

redis 分佈式鎖的 5個坑,真是又大又深

redis 分佈式鎖的 5個坑,真是又大又深


分享到:


相關文章: