共計 4668 個字符,預計需要花費 12 分鐘才能閱讀完成。
這篇文章主要介紹“分布式鎖的原理及 Redis 怎么實現分布式鎖”,在日常操作中,相信很多人在分布式鎖的原理及 Redis 怎么實現分布式鎖問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”分布式鎖的原理及 Redis 怎么實現分布式鎖”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
一、分布式鎖基本原理
分布式鎖:滿足分布式系統或集群模式下多進程可見并且互斥的鎖。
分布式鎖應該滿足的條件:
可見性:多個線程都能看到相同的結果,注意:這個地方說的可見性并不是并發編程中指的內存可見性,只是說多個進程之間都能感知到變化的意思
互斥:互斥是分布式鎖的最基本的條件,使得程序串行執行
高可用:程序不易崩潰,時時刻刻都保證較高的可用性
高性能:由于加鎖本身就讓性能降低,所有對于分布式鎖本身需要他就較高的加鎖性能和釋放鎖性能
安全性:安全也是程序中必不可少的一環
常見的分布式鎖有三種:
Mysql:mysql 本身就帶有鎖機制,但是由于 mysql 性能本身一般,所以采用分布式鎖的情況下,其實使用 mysql 作為分布式鎖比較少見
Redis:redis 作為分布式鎖是非常常見的一種使用方式,現在企業級開發中基本都使用 redis 或者 zookeeper 作為分布式鎖,利用 setnx 這個方法,如果插入 key 成功,則表示獲得到了鎖,如果有人插入成功,其他人插入失敗則表示無法獲得到鎖,利用這套邏輯來實現分布式鎖
Zookeeper:zookeeper 也是企業級開發中較好的一個實現分布式鎖的方案
二、基于 Redis 實現分布式鎖
實現分布式鎖時需要實現的兩個基本方法:
獲取鎖:
互斥:確保只能有一個線程獲取鎖
非阻塞:嘗試一次,成功返回 true,失敗返回 false
釋放鎖:
手動釋放
超時釋放:獲取鎖時添加一個超時時間
基于 Redis 實現分布式鎖原理:
SET resource_name my_random_value NX PX 30000
resource_name:資源名稱,可根據不同的業務區分不同的鎖
my_random_value:隨機值,每個線程的隨機值都不同,用于釋放鎖時的校驗
NX:key 不存在時設置成功,key 存在則設置不成功
PX:自動失效時間,出現異常情況,鎖可以過期失效
利用 NX 的原子性,多個線程并發時,只有一個線程可以設置成功,設置成功表示獲得鎖,可以執行后續的業務處理;如果出現異常,過了鎖的有效期,鎖自動釋放;
版本一
1、定義 ILock 接口
public interface ILock extends AutoCloseable {
/**
* 嘗試獲取鎖
*
* @param timeoutSec 鎖持有的超時時間,過期后自動釋放
* @return true 代表獲取鎖成功;false 代表獲取鎖失敗
*/
boolean tryLock(long timeoutSec);
/**
* 釋放鎖
* @return
*/
void unLock();
}
2、基于 Redis 實現分布式鎖—RedisLock
public class SimpleRedisLock {
private final StringRedisTemplate stringRedisTemplate;
private final String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String KEY_PREFIX = lock:
@Override
public boolean tryLock(long timeoutSec) {
// 獲取線程標識
String threadId = Thread.currentThread().getId();
// 獲取鎖
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unLock() {
// 通過 del 刪除鎖
stringRedisTemplate.delete(KEY_PREFIX + name);
}
@Override
public void close() {
unLock();
}
}
鎖誤刪問題
問題說明:
持有鎖的線程 1 在鎖的內部出現了阻塞,這時鎖超時自動釋放,這時線程 2 嘗試獲得鎖,然后線程 2 在持有鎖執行過程中,線程 1 反應過來,繼續執行,走到了刪除鎖邏輯,此時就會把本應該屬于線程 2 的鎖進行刪除,這就是鎖誤刪的情況。
解決方案:
在存入鎖時,放入自己線程的標識,在刪除鎖時,判斷當前這把鎖的標識是不是自己存入的,如果是,則進行刪除,如果不是,則不進行刪除。
版本二:解決鎖誤刪問題
public class SimpleRedisLock {
private final StringRedisTemplate stringRedisTemplate;
private final String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String KEY_PREFIX = lock:
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + -
@Override
public boolean tryLock(long timeoutSec) {
// 獲取線程標識
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 獲取鎖
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unLock() {
// 獲取線程標示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 獲取鎖中的標示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判斷標示是否一致
if(threadId.equals(id)) {
// 釋放鎖
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
@Override
public void close() {
unLock();
}
}
鎖釋放的原子性問題
問題分析:
上述釋放鎖的代碼依然存在鎖誤刪問題,當線程 1 獲取鎖中的線程標識,并根據標識判斷是自己的鎖,這時鎖到期自動釋放,恰好線程 2 嘗試獲取鎖,并拿到了鎖,此時線程 1 依然執行釋放鎖的操作,就導致誤刪了線程 2 持有的鎖。
原因在于,由 java 代碼實現的釋放鎖流程不是原子操作,存在線程安全問題。
解決方案:
Redis 提供了 Lua 腳本功能,在一個腳本中編寫多條 Redis 命令,可以確保多條命令執行時的原子性。
版本三:調用 Lua 腳本改造分布式鎖
public class SimpleRedisLock implements ILock {
private final StringRedisTemplate stringRedisTemplate;
private final String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String KEY_PREFIX = lock:
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + -
@Override
public boolean tryLock(long timeoutSec) {
// 獲取線程標識
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 獲取鎖
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unLock() {
String script = if redis.call(get ,KEYS[1]) == ARGV[1] then\n +
return redis.call(del ,KEYS[1])\n +
else\n +
return 0\n +
end
// 通過執行 lua 腳本實現鎖刪除,可以校驗隨機值
RedisScript Boolean redisScript = RedisScript.of(script, Boolean.class);
stringRedisTemplate.execute(redisScript,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
@Override
public void close() {
unLock();
}
}
到此,關于“分布式鎖的原理及 Redis 怎么實現分布式鎖”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!