久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何理解分布式系統下基于Redis的分布式鎖

589次閱讀
沒有評論

共計 15541 個字符,預計需要花費 39 分鐘才能閱讀完成。

這篇文章主要介紹“如何理解分布式系統下基于 Redis 的分布式鎖”,在日常操作中,相信很多人在如何理解分布式系統下基于 Redis 的分布式鎖問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何理解分布式系統下基于 Redis 的分布式鎖”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

新接手的項目,偶爾會出現賬不平的問題。之前的技術老大臨走時給的解釋是:排查了,沒找到原因,之后太忙就沒再解決,可能是框架的原因……

既然項目交付到手中,這樣的問題是必須要解決的。梳理了所有賬務處理邏輯,最終找到了原因:數據庫并發操作熱點賬戶導致。就這這個問題,來聊一聊分布式系統下基于 Redis 的分布式鎖。順便也分解一下問題形成原因及解決方案。

原因分析

系統并發量并不高,存在熱點賬戶,但也不至于那么嚴重。問題的根源在于系統架構設計,人為的制造了并發。場景是這樣的:商戶批量導入一批數據,系統會進行前置處理,并對賬戶余額進行增減。

此時,另外一個定時任務,也會對賬戶進行掃描更新。而且對同一賬戶的操作分布到各個系統當中,熱點賬戶也就出現了。

針對此問題的解決方案,從架構層面可以考慮將賬務系統進行抽離,集中在一個系統中進行處理,所有的數據庫事務及執行順序由賬務系統來統籌處理。從技術方面來講,則可以通過鎖機制來對熱點賬戶進行加鎖。

本篇文章就針對熱點賬戶基于分布式鎖的實現方式進行詳細的講解。

鎖的分析

在 Java 的多線程環境下,通常有幾類鎖可以使用:

JVM 內存模型級別的鎖,常用的有:synchronized、Lock 等;

數據庫鎖,比如樂觀鎖,悲觀鎖等;

分布式鎖;

JVM 內存級別的鎖,可以保證單體服務下線程的安全性,比如多個線程訪問 / 修改一個全局變量。但當系統進行集群部署時,JVM 級別的本地鎖就無能為力了。

悲觀鎖與樂觀鎖

像上述案例中,熱點賬戶就屬于分布式系統中的共享資源,我們通常會采用數據庫鎖或分布式鎖來進行解決。

數據庫鎖,又分為樂觀鎖和悲觀鎖。

悲觀鎖是基于數據庫(Mysql 的 InnoDB)提供的排他鎖來實現的。在進行事務操作時,通過 select … for update 語句,MySQL 會對查詢結果集中每行數據都添加排他鎖,其他線程對該記錄的更新與刪除操作都會阻塞。從而達到共享資源的順序執行(修改);

樂觀鎖是相對悲觀鎖而言的,樂觀鎖假設數據一般情況不會造成沖突,所以在數據進行提交更新的時候,才會正式對數據的沖突與否進行檢測。如果沖突則返回給用戶異常信息,讓用戶決定如何去做。樂觀鎖適用于讀多寫少的場景,這樣可以提高程序的吞吐量。在樂觀鎖實現時通常會基于記錄狀態或添加 version 版本來進行實現。

悲觀鎖失效場景

項目中使用了悲觀鎖,但悲觀鎖卻失效了。這也是使用悲觀鎖時,常見的誤區,下面來分析一下。

正常使用悲觀鎖的流程:

通過 select … for update 鎖定記錄;

計算新余額,修改金額并存儲;

執行完成釋放鎖;

經常犯錯的處理流程:

查詢賬戶余額,計算新余額;

通過 select … for update 鎖定記錄;

修改金額并存儲;

執行完成釋放鎖;

錯誤的流程中,比如 A 和 B 服務查詢到的余額都是 100,A 扣減 50,B 扣減 40,然后 A 鎖定記錄,更新數據庫為 50;A 釋放鎖之后,B 鎖定記錄,更新數據庫為 60。顯然,后者把前者的更新給覆蓋掉了。解決的方案就是擴大鎖的范圍,將鎖提前到計算新余額之前。

通常悲觀鎖對數據庫的壓力是非常大的,在實踐中通常會根據場景使用樂觀鎖或分布式鎖等方式來實現。

下面進入正題,講講基于 Redis 的分布式鎖實現。

Redis 分布式鎖實戰演習

這里以 Spring Boot、Redis、Lua 腳本為例來演示分布式鎖的實現。為了簡化處理,示例中 Redis 既承擔了分布式鎖的功能,也承擔了數據庫的功能。

場景構建

集群環境下,對同一個賬戶的金額進行操作,基本步驟:

從數據庫讀取用戶金額;

程序修改金額;

再將最新金額存儲到數據庫;

下面從最初不加鎖,不同步處理,逐步推演出最終的分布式鎖。

基礎集成及類構建

準備一個不加鎖處理的基礎業務環境。

首先在 Spring Boot 項目中引入相關依賴:

dependency 
  groupId org.springframework.boot /groupId 
  artifactId spring-boot-starter-data-redis /artifactId 
 /dependency 
 dependency 
  groupId org.springframework.boot /groupId 
  artifactId spring-boot-starter-web /artifactId 
 /dependency

賬戶對應實體類 UserAccount:

public class UserAccount {
 // 用戶 ID
 private String userId;
 // 賬戶內金額
 private int amount;
 // 添加賬戶金額
 public void addAmount(int amount) {
 this.amount = this.amount + amount;
 }
 //  省略構造方法和 getter/setter 
}

創建一個線程實現類 AccountOperationThread:

public class AccountOperationThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);
 private static final Long RELEASE_SUCCESS = 1L;
 private String userId;
 private RedisTemplate Object, Object  redisTemplate;
 public AccountOperationThread(String userId, RedisTemplate Object, Object  redisTemplate) {
 this.userId = userId;
 this.redisTemplate = redisTemplate;
 }
 @Override
 public void run() { noLock();
 }
 /**
 *  不加鎖
 */
 private void noLock() {
 try { Random random = new Random();
 //  模擬線程進行業務處理
 TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
 } catch (InterruptedException e) { e.printStackTrace();
 }
 // 模擬數據庫中獲取用戶賬號
 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
 //  金額 +1
 userAccount.addAmount(1);
 logger.info(Thread.currentThread().getName() +   : user id :   + userId +   amount :   + userAccount.getAmount());
 // 模擬存回數據庫
 redisTemplate.opsForValue().set(userId, userAccount);
 }
}

其中 RedisTemplate 的實例化交給了 Spring Boot:

@Configuration
public class RedisConfig {
 @Bean
 public RedisTemplate Object, Object  redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate Object, Object  redisTemplate = new RedisTemplate ();
 redisTemplate.setConnectionFactory(redisConnectionFactory);
 Jackson2JsonRedisSerializer Object  jackson2JsonRedisSerializer =
 new Jackson2JsonRedisSerializer (Object.class);
 ObjectMapper objectMapper = new ObjectMapper();
 objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
 objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
 jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
 //  設置 value 的序列化規則和  key 的序列化規則
 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
 redisTemplate.setKeySerializer(new StringRedisSerializer());
 redisTemplate.afterPropertiesSet();
 return redisTemplate;
 }
}

最后,再準備一個 TestController 來進行觸發多線程的運行:

@RestController
public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class);
 private static ExecutorService executorService = Executors.newFixedThreadPool(10);
 @Autowired
 private RedisTemplate Object, Object  redisTemplate;
 @GetMapping(/test)
 public String test() throws InterruptedException {
 //  初始化用戶 user_001 到 Redis,賬戶金額為 0
 redisTemplate.opsForValue().set( user_001 , new UserAccount( user_001 , 0));
 //  開啟 10 個線程進行同步測試,每個線程為賬戶增加 1 元
 for (int i = 0; i   10; i++) { logger.info( 創建線程 i =  + i);
 executorService.execute(new AccountOperationThread( user_001 , redisTemplate));
 }
 //  主線程休眠 1 秒等待線程跑完
 TimeUnit.MILLISECONDS.sleep(1000);
 //  查詢 Redis 中的 user_001 賬戶
 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get( user_001 
 logger.info(user id :   + userAccount.getUserId() +   amount :   + userAccount.getAmount());
 return  success 
 }
}

執行上述程序,正常來說 10 個線程,每個線程加 1,結果應該是 10。但多執行幾次,會發現,結果變化很大,基本上都要比 10 小。

[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2
[pool-1-thread-2] c.s.redis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5
[nio-8080-exec-1] c.s.redis.controller.TestController : user id : user_001 amount : 5

以上述日志為例,前四個線程都將值改為 1,也就是后面三個線程都將前面的修改進行了覆蓋,導致最終結果不是 10,只有 5。這顯然是有問題的。

Redis 同步鎖實現

針對上面的情況,在同一個 JVM 當中,我們可以通過線程加鎖來完成。但在分布式環境下,JVM 級別的鎖是沒辦法實現的,這里可以采用 Redis 同步鎖實現。

基本思路:第一個線程進入時,在 Redis 中進記錄,當后續線程過來請求時,判斷 Redis 是否存在該記錄,如果存在則說明處于鎖定狀態,進行等待或返回。如果不存在,則進行后續業務處理。

 /**
 * 1. 搶占資源時判斷是否被鎖。 * 2. 如未鎖則搶占成功且加鎖,否則等待鎖釋放。 * 3. 業務完成后釋放鎖, 讓給其它線程。 *  p 
 *  該方案并未解決同步問題,原因:線程獲得鎖和加鎖的過程,并非原子性操作,可能會導致線程 A 獲得鎖,還未加鎖時,線程 B 也獲得了鎖。 */
 private void redisLock() { Random random = new Random();
 try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
 } catch (InterruptedException e) { e.printStackTrace();
 }
 while (true) { Object lock = redisTemplate.opsForValue().get(userId +  :syn 
 if (lock == null) {
 //  獲得鎖  -   加鎖  -   跳出循環
 logger.info(Thread.currentThread().getName() +  : 獲得鎖 
 redisTemplate.opsForValue().set(userId +  :syn ,  lock 
 break;
 }
 try {
 //  等待 500 毫秒重試獲得鎖
 TimeUnit.MILLISECONDS.sleep(500);
 } catch (InterruptedException e) { e.printStackTrace();
 }
 }
 try {
 // 模擬數據庫中獲取用戶賬號
 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
 if (userAccount != null) {
 // 設置金額
 userAccount.addAmount(1);
 logger.info(Thread.currentThread().getName() +   : user id :   + userId +   amount :   + userAccount.getAmount());
 // 模擬存回數據庫
 redisTemplate.opsForValue().set(userId, userAccount);
 }
 } finally {
 // 釋放鎖
 redisTemplate.delete(userId +  :syn 
 logger.info(Thread.currentThread().getName() +  : 釋放鎖 
 }
 }

在 while 代碼塊中,先判斷對應用戶 ID 是否在 Redis 中存在,如果不存在,則進行 set 加鎖,如果存在,則跳出循環繼續等待。

上述代碼,看起來實現了加鎖的功能,但當執行程序時,會發現與未加鎖一樣,依舊存在并發問題。原因是:獲取鎖和加鎖的操作并不是原子的。比如兩個線程發現 lock 都是 null,都進行了加鎖,此時并發問題依舊存在。

Redis 原子性同步鎖

針對上述問題,可將獲取鎖和加鎖的過程原子化處理。基于 spring-boot-data-redis 提供的原子化 API 可以實現:

//  該方法使用了 redis 的指令:SETNX key value
// 1.key 不存在,設置成功返回 value,setIfAbsent 返回 true;// 2.key 存在,則設置失敗返回 null,setIfAbsent 返回 false;// 3. 原子性操作;Boolean setIfAbsent(K var1, V var2);

上述方法的原子化操作是對 Redis 的 setnx 命令的封裝,在 Redis 中 setnx 的使用如下實例:

redis  SETNX mykey  Hello 
(integer) 1
redis  SETNX mykey  World 
(integer) 0
redis  GET mykey
 Hello

第一次,設置 mykey 時,并不存在,則返回 1,表示設置成功;第二次設置 mykey 時,已經存在,則返回 0,表示設置失敗。再次查詢 mykey 對應的值,會發現依舊是第一次設置的值。也就是說 redis 的 setnx 保證了唯一的 key 只能被一個服務設置成功。

理解了上述 API 及底層原理,來看看線程中的實現方法代碼如下:

 /**
 * 1. 原子操作加鎖
 * 2. 競爭線程循環重試獲得鎖
 * 3. 業務完成釋放鎖
 */
 private void atomicityRedisLock() {
 //Spring data redis  支持的原子性操作
 while (!redisTemplate.opsForValue().setIfAbsent(userId +  :syn ,  lock)) {
 try {
 //  等待 100 毫秒重試獲得鎖
 TimeUnit.MILLISECONDS.sleep(100);
 } catch (InterruptedException e) { e.printStackTrace();
 }
 }
 logger.info(Thread.currentThread().getName() +  : 獲得鎖 
 try {
 // 模擬數據庫中獲取用戶賬號
 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
 if (userAccount != null) {
 // 設置金額
 userAccount.addAmount(1);
 logger.info(Thread.currentThread().getName() +   : user id :   + userId +   amount :   + userAccount.getAmount());
 // 模擬存回數據庫
 redisTemplate.opsForValue().set(userId, userAccount);
 }
 } finally {
 // 釋放鎖
 redisTemplate.delete(userId +  :syn 
 logger.info(Thread.currentThread().getName() +  : 釋放鎖 
 }
 }

再次執行代碼,會發現結果正確了,也就是說可以成功的對分布式線程進行了加鎖。

Redis 分布式鎖的死鎖

雖然上述代碼執行結果沒問題,但如果應用異常宕機,沒來得及執行 finally 中釋放鎖的方法,那么其他線程則永遠無法獲得這個鎖。

此時可采用 setIfAbsent 的重載方法:

Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);

基于該方法,可以設置鎖的過期時間。這樣即便獲得鎖的線程宕機,在 Redis 中數據過期之后,其他線程可正常獲得該鎖。

示例代碼如下:

private void atomicityAndExRedisLock() {
 try {
 //Spring data redis  支持的原子性操作, 并設置 5 秒過期時間
 while (!redisTemplate.opsForValue().setIfAbsent(userId +  :syn ,
 System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
 //  等待 100 毫秒重試獲得鎖
 logger.info(Thread.currentThread().getName() +  : 嘗試循環獲取鎖 
 TimeUnit.MILLISECONDS.sleep(1000);
 }
 logger.info(Thread.currentThread().getName() +  : 獲得鎖 -------- 
 //  應用在這里宕機,進程退出,無法執行  finally;
 Thread.currentThread().interrupt();
 //  業務邏輯...
 } catch (InterruptedException e) { e.printStackTrace();
 } finally {
 // 釋放鎖
 if (!Thread.currentThread().isInterrupted()) {
 redisTemplate.delete(userId +  :syn 
 logger.info(Thread.currentThread().getName() +  : 釋放鎖 
 }
 }
 }

業務超時及守護線程

上面添加了 Redis 所的超時時間,看似解決了問題,但又引入了新的問題。

比如,正常情況下線程 A 在 5 秒內可正常處理完業務,但偶發會出現超過 5 秒的情況。如果將超時時間設置為 5 秒,線程 A 獲得了鎖,但業務邏輯處理需要 6 秒。此時,線程 A 還在正常業務邏輯,線程 B 已經獲得了鎖。當線程 A 處理完時,有可能將線程 B 的鎖給釋放掉。

在上述場景中有兩個問題點:

第一,線程 A 和線程 B 可能會同時在執行,存在并發問題。

第二,線程 A 可能會把線程 B 的鎖給釋放掉,導致一系列的惡性循環。

當然,可以通過在 Redis 中設置 value 值來判斷鎖是屬于線程 A 還是線程 B。但仔細分析會發現,這個問題的本質是因為線程 A 執行業務邏輯耗時超出了鎖超時的時間。

那么就有兩個解決方案了:

第一,將超時時間設置的足夠長,確保業務代碼能夠在鎖釋放之前執行完成;

第二,為鎖添加守護線程,為將要過期釋放但未釋放的鎖增加時間;

第一種方式需要全行大多數情況下業務邏輯的耗時,進行超時時間的設定。

第二種方式,可通過如下守護線程的方式來動態增加鎖超時時間。

public class DaemonThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);
 //  是否需要守護   主線程關閉則結束守護線程
 private volatile boolean daemon = true;
 //  守護鎖
 private String lockKey;
 private RedisTemplate Object, Object  redisTemplate;
 public DaemonThread(String lockKey, RedisTemplate Object, Object  redisTemplate) {
 this.lockKey = lockKey;
 this.redisTemplate = redisTemplate;
 }
 @Override
 public void run() {
 try { while (daemon) { long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
 //  剩余有效期小于 1 秒則續命
 if (time   1000) { logger.info( 守護進程:   + Thread.currentThread().getName() +    延長鎖時間  5000  毫秒 
 redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
 }
 TimeUnit.MILLISECONDS.sleep(300);
 }
 logger.info(  守護進程:   + Thread.currentThread().getName() +  關閉   
 } catch (InterruptedException e) { e.printStackTrace();
 }
 }
 //  主線程主動調用結束
 public void stop() {
 daemon = false;
 }
}

上述線程每隔 300 毫秒獲取一下 Redis 中鎖的超時時間,如果小于 1 秒,則延長 5 秒。當主線程調用關閉時,守護線程也隨之關閉。

主線程中相關代碼實現:

private void deamonRedisLock() {
 // 守護線程
 DaemonThread daemonThread = null;
 //Spring data redis  支持的原子性操作, 并設置 5 秒過期時間
 String uuid = UUID.randomUUID().toString();
 String value = Thread.currentThread().getId() +  :  + uuid;
 try { while (!redisTemplate.opsForValue().setIfAbsent(userId +  :syn , value, 5000, TimeUnit.MILLISECONDS)) {
 //  等待 100 毫秒重試獲得鎖
 logger.info(Thread.currentThread().getName() +  : 嘗試循環獲取鎖 
 TimeUnit.MILLISECONDS.sleep(1000);
 }
 logger.info(Thread.currentThread().getName() +  : 獲得鎖 ---- 
 //  開啟守護線程
 daemonThread = new DaemonThread(userId +  :syn , redisTemplate);
 Thread thread = new Thread(daemonThread);
 thread.start();
 //  業務邏輯執行 10 秒...
 TimeUnit.MILLISECONDS.sleep(10000);
 } catch (InterruptedException e) { e.printStackTrace();
 } finally {
 // 釋放鎖   這里也需要原子操作, 今后通過  Redis + Lua  講
 String result = (String) redisTemplate.opsForValue().get(userId +  :syn 
 if (value.equals(result)) {
 redisTemplate.delete(userId +  :syn 
 logger.info(Thread.currentThread().getName() +  : 釋放鎖 ----- 
 }
 // 關閉守護線程
 if (daemonThread != null) { daemonThread.stop();
 }
 }
 }

其中在獲得鎖之后,開啟守護線程,在 finally 中將守護線程關閉。

基于 Lua 腳本的實現

在上述邏輯中,我們是基于 spring-boot-data-redis 提供的原子化操作來保證鎖判斷和執行的原子化的。在非 Spring Boot 項目中,則可以基于 Lua 腳本來實現。

首先定義加鎖和解鎖的 Lua 腳本及對應的 DefaultRedisScript 對象,在 RedisConfig 配置類中添加如下實例化代碼:

@Configuration
public class RedisConfig {
 //lock script
 private static final String LOCK_SCRIPT =   if redis.call(setnx ,KEYS[1],ARGV[1]) == 1   +
   then redis.call(expire ,KEYS[1],ARGV[2])   +
   return 1   +
   else return 0 end  
 private static final String UNLOCK_SCRIPT =  if redis.call(get , KEYS[1]) == ARGV[1] then return redis.call  +
  (del , KEYS[1]) else return 0 end 
 // ...  省略部分代碼
 
 @Bean
 public DefaultRedisScript Boolean  lockRedisScript() { DefaultRedisScript Boolean  defaultRedisScript = new DefaultRedisScript ();
 defaultRedisScript.setResultType(Boolean.class);
 defaultRedisScript.setScriptText(LOCK_SCRIPT);
 return defaultRedisScript;
 }
 @Bean
 public DefaultRedisScript Long  unlockRedisScript() { DefaultRedisScript Long  defaultRedisScript = new DefaultRedisScript ();
 defaultRedisScript.setResultType(Long.class);
 defaultRedisScript.setScriptText(UNLOCK_SCRIPT);
 return defaultRedisScript;
 }
}

再通過在 AccountOperationThread 類中新建構造方法,將上述兩個對象傳入類中(省略此部分演示)。然后,就可以基于 RedisTemplate 來調用了,改造之后的代碼實現如下:

 private void deamonRedisLockWithLua() {
 // 守護線程
 DaemonThread daemonThread = null;
 //Spring data redis  支持的原子性操作, 并設置 5 秒過期時間
 String uuid = UUID.randomUUID().toString();
 String value = Thread.currentThread().getId() +  :  + uuid;
 try { while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId +  :syn), value, 5)) {
 //  等待 1000 毫秒重試獲得鎖
 logger.info(Thread.currentThread().getName() +  : 嘗試循環獲取鎖 
 TimeUnit.MILLISECONDS.sleep(1000);
 }
 logger.info(Thread.currentThread().getName() +  : 獲得鎖 ---- 
 //  開啟守護線程
 daemonThread = new DaemonThread(userId +  :syn , redisTemplate);
 Thread thread = new Thread(daemonThread);
 thread.start();
 //  業務邏輯執行 10 秒...
 TimeUnit.MILLISECONDS.sleep(10000);
 } catch (InterruptedException e) { logger.error( 異常 , e);
 } finally {
 // 使用 Lua 腳本:先判斷是否是自己設置的鎖,再執行刪除
 // key 存在, 當前值 = 期望值時, 刪除 key;key 存在, 當前值!= 期望值時, 返回 0;
 Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId +  :syn), value);
 logger.info(redis 解鎖:{} , RELEASE_SUCCESS.equals(result));
 if (RELEASE_SUCCESS.equals(result)) { if (daemonThread != null) {
 // 關閉守護線程
 daemonThread.stop();
 logger.info(Thread.currentThread().getName() +  : 釋放鎖 --- 
 }
 }
 }
 }

其中 while 循環中加鎖和 finally 中的釋放鎖都是基于 Lua 腳本來實現了。

Redis 鎖的其他因素

除了上述實例,在使用 Redis 分布式鎖時,還可以考慮以下情況及方案。

Redis 鎖的不可重入

當線程在持有鎖的情況下再次請求加鎖,如果一個鎖支持一個線程多次加鎖,那么這個鎖就是可重入的。如果一個不可重入鎖被再次加鎖,由于該鎖已經被持有,再次加鎖會失敗。Redis 可通過對鎖進行重入計數,加鎖時加 1,解鎖時減 1,當計數歸 0 時釋放鎖。

可重入鎖雖然高效但會增加代碼的復雜性,這里就不舉例說明了。

等待鎖釋放

有的業務場景,發現被鎖則直接返回。但有的場景下,客戶端需要等待鎖釋放然后去搶鎖。上述示例就屬于后者。針對等待鎖釋放也有兩種方案:

客戶端輪訓:當未獲得鎖時,等待一段時間再重新獲取,直到成功。上述示例就是基于這種方式實現的。這種方式的缺點也很明顯,比較耗費服務器資源,當并發量大時會影響服務器的效率。

使用 Redis 的訂閱發布功能:當獲取鎖失敗時,訂閱鎖釋放消息,獲取鎖成功后釋放時,發送釋放消息。

集群中的主備切換和腦裂

在 Redis 包含主從同步的集群部署方式中,如果主節點掛掉,從節點提升為主節點。如果客戶端 A 在主節點加鎖成功,指令還未同步到從節點,此時主節點掛掉,從節點升為主節點,新的主節點中沒有鎖的數據。這種情況下,客戶端 B 就可能加鎖成功,從而出現并發的場景。

當集群發生腦裂時,Redis master 節點跟 slave 節點和 sentinel 集群處于不同的網絡分區。sentinel 集群無法感知到 master 的存在,會將 slave 節點提升為 master 節點,此時就會存在兩個不同的 master 節點。從而也會導致并發問題的出現。Redis Cluster 集群部署方式同理。

到此,關于“如何理解分布式系統下基于 Redis 的分布式鎖”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-07-27發表,共計15541字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 东兴市| 昌乐县| 灌阳县| 太谷县| 新巴尔虎左旗| 汶上县| 静海县| 灵川县| 手游| 廊坊市| 建宁县| 江油市| 永济市| 衡东县| 方城县| 舟山市| 澎湖县| 乌什县| 普兰县| 黄浦区| 成都市| 满城县| 林州市| 牟定县| 新乡县| 邛崃市| 白银市| 密山市| 灵宝市| 云南省| 南开区| 繁昌县| 霍山县| 芜湖市| 高安市| 五河县| 台东县| 甘孜县| 新乡市| 扎鲁特旗| 洪洞县|