共計 15541 個字符,預計需要花費 39 分鐘才能閱讀完成。
這篇文章主要介紹“如何理解分布式系統下基于 Redis 的分布式鎖”,在日常操作中,相信很多人在如何理解分布式系統下基于 Redis 的分布式鎖問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何理解分布式系統下基于 Redis 的分布式鎖”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
新接手的項目,偶爾會出現賬不平的問題。之前的技術老大臨走時給的解釋是:排查了,沒找到原因,之后太忙就沒再解決,可能是框架的原因……
既然項目交付到手中,這樣的問題是必須要解決的。梳理了所有賬務處理邏輯,最終找到了原因:數據庫并發操作熱點賬戶導致。就這這個問題,來聊一聊分布式系統下基于 Redis 的分布式鎖。順便也分解一下問題形成原因及解決方案。
原因分析
系統并發量并不高,存在熱點賬戶,但也不至于那么嚴重。問題的根源在于系統架構設計,人為的制造了并發。場景是這樣的:商戶批量導入一批數據,系統會進行前置處理,并對賬戶余額進行增減。
此時,另外一個定時任務,也會對賬戶進行掃描更新。而且對同一賬戶的操作分布到各個系統當中,熱點賬戶也就出現了。
針對此問題的解決方案,從架構層面可以考慮將賬務系統進行抽離,集中在一個系統中進行處理,所有的數據庫事務及執行順序由賬務系統來統籌處理。從技術方面來講,則可以通過鎖機制來對熱點賬戶進行加鎖。
本篇文章就針對熱點賬戶基于分布式鎖的實現方式進行詳細的講解。
鎖的分析
在 Java 的多線程環境下,通常有幾類鎖可以使用:
JVM 內存模型級別的鎖,常用的有:synchronized、Lock 等;
數據庫鎖,比如樂觀鎖,悲觀鎖等;
分布式鎖;
JVM 內存級別的鎖,可以保證單體服務下線程的安全性,比如多個線程訪問 / 修改一個全局變量。但當系統進行集群部署時,JVM 級別的本地鎖就無能為力了。
悲觀鎖與樂觀鎖
像上述案例中,熱點賬戶就屬于分布式系統中的共享資源,我們通常會采用數據庫鎖或分布式鎖來進行解決。
數據庫鎖,又分為樂觀鎖和悲觀鎖。
悲觀鎖是基于數據庫(Mysql 的 InnoDB)提供的排他鎖來實現的。在進行事務操作時,通過 select … for update 語句,MySQL 會對查詢結果集中每行數據都添加排他鎖,其他線程對該記錄的更新與刪除操作都會阻塞。從而達到共享資源的順序執行(修改);
樂觀鎖是相對悲觀鎖而言的,樂觀鎖假設數據一般情況不會造成沖突,所以在數據進行提交更新的時候,才會正式對數據的沖突與否進行檢測。如果沖突則返回給用戶異常信息,讓用戶決定如何去做。樂觀鎖適用于讀多寫少的場景,這樣可以提高程序的吞吐量。在樂觀鎖實現時通常會基于記錄狀態或添加 version 版本來進行實現。
悲觀鎖失效場景
項目中使用了悲觀鎖,但悲觀鎖卻失效了。這也是使用悲觀鎖時,常見的誤區,下面來分析一下。
正常使用悲觀鎖的流程:
通過 select … for update 鎖定記錄;
計算新余額,修改金額并存儲;
執行完成釋放鎖;
經常犯錯的處理流程:
查詢賬戶余額,計算新余額;
通過 select … for update 鎖定記錄;
修改金額并存儲;
執行完成釋放鎖;
錯誤的流程中,比如 A 和 B 服務查詢到的余額都是 100,A 扣減 50,B 扣減 40,然后 A 鎖定記錄,更新數據庫為 50;A 釋放鎖之后,B 鎖定記錄,更新數據庫為 60。顯然,后者把前者的更新給覆蓋掉了。解決的方案就是擴大鎖的范圍,將鎖提前到計算新余額之前。
通常悲觀鎖對數據庫的壓力是非常大的,在實踐中通常會根據場景使用樂觀鎖或分布式鎖等方式來實現。
下面進入正題,講講基于 Redis 的分布式鎖實現。
Redis 分布式鎖實戰演習
這里以 Spring Boot、Redis、Lua 腳本為例來演示分布式鎖的實現。為了簡化處理,示例中 Redis 既承擔了分布式鎖的功能,也承擔了數據庫的功能。
場景構建
集群環境下,對同一個賬戶的金額進行操作,基本步驟:
從數據庫讀取用戶金額;
程序修改金額;
再將最新金額存儲到數據庫;
下面從最初不加鎖,不同步處理,逐步推演出最終的分布式鎖。
基礎集成及類構建
準備一個不加鎖處理的基礎業務環境。
首先在 Spring Boot 項目中引入相關依賴:
dependency
groupId org.springframework.boot /groupId
artifactId spring-boot-starter-data-redis /artifactId
/dependency
dependency
groupId org.springframework.boot /groupId
artifactId spring-boot-starter-web /artifactId
/dependency
賬戶對應實體類 UserAccount:
public class UserAccount {
// 用戶 ID
private String userId;
// 賬戶內金額
private int amount;
// 添加賬戶金額
public void addAmount(int amount) {
this.amount = this.amount + amount;
}
// 省略構造方法和 getter/setter
}
創建一個線程實現類 AccountOperationThread:
public class AccountOperationThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);
private static final Long RELEASE_SUCCESS = 1L;
private String userId;
private RedisTemplate Object, Object redisTemplate;
public AccountOperationThread(String userId, RedisTemplate Object, Object redisTemplate) {
this.userId = userId;
this.redisTemplate = redisTemplate;
}
@Override
public void run() { noLock();
}
/**
* 不加鎖
*/
private void noLock() {
try { Random random = new Random();
// 模擬線程進行業務處理
TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
} catch (InterruptedException e) { e.printStackTrace();
}
// 模擬數據庫中獲取用戶賬號
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
// 金額 +1
userAccount.addAmount(1);
logger.info(Thread.currentThread().getName() + : user id : + userId + amount : + userAccount.getAmount());
// 模擬存回數據庫
redisTemplate.opsForValue().set(userId, userAccount);
}
}
其中 RedisTemplate 的實例化交給了 Spring Boot:
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate Object, Object redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate Object, Object redisTemplate = new RedisTemplate ();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer Object jackson2JsonRedisSerializer =
new Jackson2JsonRedisSerializer (Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 設置 value 的序列化規則和 key 的序列化規則
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
最后,再準備一個 TestController 來進行觸發多線程的運行:
@RestController
public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class);
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
@Autowired
private RedisTemplate Object, Object redisTemplate;
@GetMapping(/test)
public String test() throws InterruptedException {
// 初始化用戶 user_001 到 Redis,賬戶金額為 0
redisTemplate.opsForValue().set( user_001 , new UserAccount( user_001 , 0));
// 開啟 10 個線程進行同步測試,每個線程為賬戶增加 1 元
for (int i = 0; i 10; i++) { logger.info( 創建線程 i = + i);
executorService.execute(new AccountOperationThread( user_001 , redisTemplate));
}
// 主線程休眠 1 秒等待線程跑完
TimeUnit.MILLISECONDS.sleep(1000);
// 查詢 Redis 中的 user_001 賬戶
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get( user_001
logger.info(user id : + userAccount.getUserId() + amount : + userAccount.getAmount());
return success
}
}
執行上述程序,正常來說 10 個線程,每個線程加 1,結果應該是 10。但多執行幾次,會發現,結果變化很大,基本上都要比 10 小。
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2
[pool-1-thread-2] c.s.redis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5
[nio-8080-exec-1] c.s.redis.controller.TestController : user id : user_001 amount : 5
以上述日志為例,前四個線程都將值改為 1,也就是后面三個線程都將前面的修改進行了覆蓋,導致最終結果不是 10,只有 5。這顯然是有問題的。
Redis 同步鎖實現
針對上面的情況,在同一個 JVM 當中,我們可以通過線程加鎖來完成。但在分布式環境下,JVM 級別的鎖是沒辦法實現的,這里可以采用 Redis 同步鎖實現。
基本思路:第一個線程進入時,在 Redis 中進記錄,當后續線程過來請求時,判斷 Redis 是否存在該記錄,如果存在則說明處于鎖定狀態,進行等待或返回。如果不存在,則進行后續業務處理。
/**
* 1. 搶占資源時判斷是否被鎖。 * 2. 如未鎖則搶占成功且加鎖,否則等待鎖釋放。 * 3. 業務完成后釋放鎖, 讓給其它線程。 * p
* 該方案并未解決同步問題,原因:線程獲得鎖和加鎖的過程,并非原子性操作,可能會導致線程 A 獲得鎖,還未加鎖時,線程 B 也獲得了鎖。 */
private void redisLock() { Random random = new Random();
try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
} catch (InterruptedException e) { e.printStackTrace();
}
while (true) { Object lock = redisTemplate.opsForValue().get(userId + :syn
if (lock == null) {
// 獲得鎖 - 加鎖 - 跳出循環
logger.info(Thread.currentThread().getName() + : 獲得鎖
redisTemplate.opsForValue().set(userId + :syn , lock
break;
}
try {
// 等待 500 毫秒重試獲得鎖
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) { e.printStackTrace();
}
}
try {
// 模擬數據庫中獲取用戶賬號
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
if (userAccount != null) {
// 設置金額
userAccount.addAmount(1);
logger.info(Thread.currentThread().getName() + : user id : + userId + amount : + userAccount.getAmount());
// 模擬存回數據庫
redisTemplate.opsForValue().set(userId, userAccount);
}
} finally {
// 釋放鎖
redisTemplate.delete(userId + :syn
logger.info(Thread.currentThread().getName() + : 釋放鎖
}
}
在 while 代碼塊中,先判斷對應用戶 ID 是否在 Redis 中存在,如果不存在,則進行 set 加鎖,如果存在,則跳出循環繼續等待。
上述代碼,看起來實現了加鎖的功能,但當執行程序時,會發現與未加鎖一樣,依舊存在并發問題。原因是:獲取鎖和加鎖的操作并不是原子的。比如兩個線程發現 lock 都是 null,都進行了加鎖,此時并發問題依舊存在。
Redis 原子性同步鎖
針對上述問題,可將獲取鎖和加鎖的過程原子化處理。基于 spring-boot-data-redis 提供的原子化 API 可以實現:
// 該方法使用了 redis 的指令:SETNX key value
// 1.key 不存在,設置成功返回 value,setIfAbsent 返回 true;// 2.key 存在,則設置失敗返回 null,setIfAbsent 返回 false;// 3. 原子性操作;Boolean setIfAbsent(K var1, V var2);
上述方法的原子化操作是對 Redis 的 setnx 命令的封裝,在 Redis 中 setnx 的使用如下實例:
redis SETNX mykey Hello
(integer) 1
redis SETNX mykey World
(integer) 0
redis GET mykey
Hello
第一次,設置 mykey 時,并不存在,則返回 1,表示設置成功;第二次設置 mykey 時,已經存在,則返回 0,表示設置失敗。再次查詢 mykey 對應的值,會發現依舊是第一次設置的值。也就是說 redis 的 setnx 保證了唯一的 key 只能被一個服務設置成功。
理解了上述 API 及底層原理,來看看線程中的實現方法代碼如下:
/**
* 1. 原子操作加鎖
* 2. 競爭線程循環重試獲得鎖
* 3. 業務完成釋放鎖
*/
private void atomicityRedisLock() {
//Spring data redis 支持的原子性操作
while (!redisTemplate.opsForValue().setIfAbsent(userId + :syn , lock)) {
try {
// 等待 100 毫秒重試獲得鎖
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) { e.printStackTrace();
}
}
logger.info(Thread.currentThread().getName() + : 獲得鎖
try {
// 模擬數據庫中獲取用戶賬號
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
if (userAccount != null) {
// 設置金額
userAccount.addAmount(1);
logger.info(Thread.currentThread().getName() + : user id : + userId + amount : + userAccount.getAmount());
// 模擬存回數據庫
redisTemplate.opsForValue().set(userId, userAccount);
}
} finally {
// 釋放鎖
redisTemplate.delete(userId + :syn
logger.info(Thread.currentThread().getName() + : 釋放鎖
}
}
再次執行代碼,會發現結果正確了,也就是說可以成功的對分布式線程進行了加鎖。
Redis 分布式鎖的死鎖
雖然上述代碼執行結果沒問題,但如果應用異常宕機,沒來得及執行 finally 中釋放鎖的方法,那么其他線程則永遠無法獲得這個鎖。
此時可采用 setIfAbsent 的重載方法:
Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
基于該方法,可以設置鎖的過期時間。這樣即便獲得鎖的線程宕機,在 Redis 中數據過期之后,其他線程可正常獲得該鎖。
示例代碼如下:
private void atomicityAndExRedisLock() {
try {
//Spring data redis 支持的原子性操作, 并設置 5 秒過期時間
while (!redisTemplate.opsForValue().setIfAbsent(userId + :syn ,
System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
// 等待 100 毫秒重試獲得鎖
logger.info(Thread.currentThread().getName() + : 嘗試循環獲取鎖
TimeUnit.MILLISECONDS.sleep(1000);
}
logger.info(Thread.currentThread().getName() + : 獲得鎖 --------
// 應用在這里宕機,進程退出,無法執行 finally;
Thread.currentThread().interrupt();
// 業務邏輯...
} catch (InterruptedException e) { e.printStackTrace();
} finally {
// 釋放鎖
if (!Thread.currentThread().isInterrupted()) {
redisTemplate.delete(userId + :syn
logger.info(Thread.currentThread().getName() + : 釋放鎖
}
}
}
業務超時及守護線程
上面添加了 Redis 所的超時時間,看似解決了問題,但又引入了新的問題。
比如,正常情況下線程 A 在 5 秒內可正常處理完業務,但偶發會出現超過 5 秒的情況。如果將超時時間設置為 5 秒,線程 A 獲得了鎖,但業務邏輯處理需要 6 秒。此時,線程 A 還在正常業務邏輯,線程 B 已經獲得了鎖。當線程 A 處理完時,有可能將線程 B 的鎖給釋放掉。
在上述場景中有兩個問題點:
第一,線程 A 和線程 B 可能會同時在執行,存在并發問題。
第二,線程 A 可能會把線程 B 的鎖給釋放掉,導致一系列的惡性循環。
當然,可以通過在 Redis 中設置 value 值來判斷鎖是屬于線程 A 還是線程 B。但仔細分析會發現,這個問題的本質是因為線程 A 執行業務邏輯耗時超出了鎖超時的時間。
那么就有兩個解決方案了:
第一,將超時時間設置的足夠長,確保業務代碼能夠在鎖釋放之前執行完成;
第二,為鎖添加守護線程,為將要過期釋放但未釋放的鎖增加時間;
第一種方式需要全行大多數情況下業務邏輯的耗時,進行超時時間的設定。
第二種方式,可通過如下守護線程的方式來動態增加鎖超時時間。
public class DaemonThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);
// 是否需要守護 主線程關閉則結束守護線程
private volatile boolean daemon = true;
// 守護鎖
private String lockKey;
private RedisTemplate Object, Object redisTemplate;
public DaemonThread(String lockKey, RedisTemplate Object, Object redisTemplate) {
this.lockKey = lockKey;
this.redisTemplate = redisTemplate;
}
@Override
public void run() {
try { while (daemon) { long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
// 剩余有效期小于 1 秒則續命
if (time 1000) { logger.info( 守護進程: + Thread.currentThread().getName() + 延長鎖時間 5000 毫秒
redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
}
TimeUnit.MILLISECONDS.sleep(300);
}
logger.info( 守護進程: + Thread.currentThread().getName() + 關閉
} catch (InterruptedException e) { e.printStackTrace();
}
}
// 主線程主動調用結束
public void stop() {
daemon = false;
}
}
上述線程每隔 300 毫秒獲取一下 Redis 中鎖的超時時間,如果小于 1 秒,則延長 5 秒。當主線程調用關閉時,守護線程也隨之關閉。
主線程中相關代碼實現:
private void deamonRedisLock() {
// 守護線程
DaemonThread daemonThread = null;
//Spring data redis 支持的原子性操作, 并設置 5 秒過期時間
String uuid = UUID.randomUUID().toString();
String value = Thread.currentThread().getId() + : + uuid;
try { while (!redisTemplate.opsForValue().setIfAbsent(userId + :syn , value, 5000, TimeUnit.MILLISECONDS)) {
// 等待 100 毫秒重試獲得鎖
logger.info(Thread.currentThread().getName() + : 嘗試循環獲取鎖
TimeUnit.MILLISECONDS.sleep(1000);
}
logger.info(Thread.currentThread().getName() + : 獲得鎖 ----
// 開啟守護線程
daemonThread = new DaemonThread(userId + :syn , redisTemplate);
Thread thread = new Thread(daemonThread);
thread.start();
// 業務邏輯執行 10 秒...
TimeUnit.MILLISECONDS.sleep(10000);
} catch (InterruptedException e) { e.printStackTrace();
} finally {
// 釋放鎖 這里也需要原子操作, 今后通過 Redis + Lua 講
String result = (String) redisTemplate.opsForValue().get(userId + :syn
if (value.equals(result)) {
redisTemplate.delete(userId + :syn
logger.info(Thread.currentThread().getName() + : 釋放鎖 -----
}
// 關閉守護線程
if (daemonThread != null) { daemonThread.stop();
}
}
}
其中在獲得鎖之后,開啟守護線程,在 finally 中將守護線程關閉。
基于 Lua 腳本的實現
在上述邏輯中,我們是基于 spring-boot-data-redis 提供的原子化操作來保證鎖判斷和執行的原子化的。在非 Spring Boot 項目中,則可以基于 Lua 腳本來實現。
首先定義加鎖和解鎖的 Lua 腳本及對應的 DefaultRedisScript 對象,在 RedisConfig 配置類中添加如下實例化代碼:
@Configuration
public class RedisConfig {
//lock script
private static final String LOCK_SCRIPT = if redis.call(setnx ,KEYS[1],ARGV[1]) == 1 +
then redis.call(expire ,KEYS[1],ARGV[2]) +
return 1 +
else return 0 end
private static final String UNLOCK_SCRIPT = if redis.call(get , KEYS[1]) == ARGV[1] then return redis.call +
(del , KEYS[1]) else return 0 end
// ... 省略部分代碼
@Bean
public DefaultRedisScript Boolean lockRedisScript() { DefaultRedisScript Boolean defaultRedisScript = new DefaultRedisScript ();
defaultRedisScript.setResultType(Boolean.class);
defaultRedisScript.setScriptText(LOCK_SCRIPT);
return defaultRedisScript;
}
@Bean
public DefaultRedisScript Long unlockRedisScript() { DefaultRedisScript Long defaultRedisScript = new DefaultRedisScript ();
defaultRedisScript.setResultType(Long.class);
defaultRedisScript.setScriptText(UNLOCK_SCRIPT);
return defaultRedisScript;
}
}
再通過在 AccountOperationThread 類中新建構造方法,將上述兩個對象傳入類中(省略此部分演示)。然后,就可以基于 RedisTemplate 來調用了,改造之后的代碼實現如下:
private void deamonRedisLockWithLua() {
// 守護線程
DaemonThread daemonThread = null;
//Spring data redis 支持的原子性操作, 并設置 5 秒過期時間
String uuid = UUID.randomUUID().toString();
String value = Thread.currentThread().getId() + : + uuid;
try { while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + :syn), value, 5)) {
// 等待 1000 毫秒重試獲得鎖
logger.info(Thread.currentThread().getName() + : 嘗試循環獲取鎖
TimeUnit.MILLISECONDS.sleep(1000);
}
logger.info(Thread.currentThread().getName() + : 獲得鎖 ----
// 開啟守護線程
daemonThread = new DaemonThread(userId + :syn , redisTemplate);
Thread thread = new Thread(daemonThread);
thread.start();
// 業務邏輯執行 10 秒...
TimeUnit.MILLISECONDS.sleep(10000);
} catch (InterruptedException e) { logger.error( 異常 , e);
} finally {
// 使用 Lua 腳本:先判斷是否是自己設置的鎖,再執行刪除
// key 存在, 當前值 = 期望值時, 刪除 key;key 存在, 當前值!= 期望值時, 返回 0;
Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + :syn), value);
logger.info(redis 解鎖:{} , RELEASE_SUCCESS.equals(result));
if (RELEASE_SUCCESS.equals(result)) { if (daemonThread != null) {
// 關閉守護線程
daemonThread.stop();
logger.info(Thread.currentThread().getName() + : 釋放鎖 ---
}
}
}
}
其中 while 循環中加鎖和 finally 中的釋放鎖都是基于 Lua 腳本來實現了。
Redis 鎖的其他因素
除了上述實例,在使用 Redis 分布式鎖時,還可以考慮以下情況及方案。
Redis 鎖的不可重入
當線程在持有鎖的情況下再次請求加鎖,如果一個鎖支持一個線程多次加鎖,那么這個鎖就是可重入的。如果一個不可重入鎖被再次加鎖,由于該鎖已經被持有,再次加鎖會失敗。Redis 可通過對鎖進行重入計數,加鎖時加 1,解鎖時減 1,當計數歸 0 時釋放鎖。
可重入鎖雖然高效但會增加代碼的復雜性,這里就不舉例說明了。
等待鎖釋放
有的業務場景,發現被鎖則直接返回。但有的場景下,客戶端需要等待鎖釋放然后去搶鎖。上述示例就屬于后者。針對等待鎖釋放也有兩種方案:
客戶端輪訓:當未獲得鎖時,等待一段時間再重新獲取,直到成功。上述示例就是基于這種方式實現的。這種方式的缺點也很明顯,比較耗費服務器資源,當并發量大時會影響服務器的效率。
使用 Redis 的訂閱發布功能:當獲取鎖失敗時,訂閱鎖釋放消息,獲取鎖成功后釋放時,發送釋放消息。
集群中的主備切換和腦裂
在 Redis 包含主從同步的集群部署方式中,如果主節點掛掉,從節點提升為主節點。如果客戶端 A 在主節點加鎖成功,指令還未同步到從節點,此時主節點掛掉,從節點升為主節點,新的主節點中沒有鎖的數據。這種情況下,客戶端 B 就可能加鎖成功,從而出現并發的場景。
當集群發生腦裂時,Redis master 節點跟 slave 節點和 sentinel 集群處于不同的網絡分區。sentinel 集群無法感知到 master 的存在,會將 slave 節點提升為 master 節點,此時就會存在兩個不同的 master 節點。從而也會導致并發問題的出現。Redis Cluster 集群部署方式同理。
到此,關于“如何理解分布式系統下基于 Redis 的分布式鎖”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!