久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Redis分布式鎖有哪些

148次閱讀
沒有評論

共計 6775 個字符,預計需要花費 17 分鐘才能閱讀完成。

自動寫代碼機器人,免費開通

Redis 分布式鎖有哪些?這個問題可能是我們日常學習或工作經常見到的。希望通過這個問題能讓你收獲頗深。下面是丸趣 TV 小編給大家帶來的參考內容,讓我們一起來看看吧!

我們通常使用的 synchronized 或者 Lock 都是線程鎖,對同一個 JVM 進程內的多個線程有效。因為鎖的本質 是內存中存放一個標記,記錄獲取鎖的線程是誰,這個標記對每個線程都可見。然而我們啟動的多個訂單服務,就是多個 JVM,內存中的鎖顯然是不共享的,每個 JVM 進程都有自己的 鎖,自然無法保證線程的互斥了,這個時候我們就需要使用到分布式鎖了。常用的有三種解決方案:1. 基于數據庫實現 2. 基于 zookeeper 的臨時序列化節點實現 3.redis 實現。本文我們介紹的就是 redis 的實現方式。
實現分布式鎖要滿足 3 點:多進程可見,互斥,可重入。

1)多進程可見

redis 本身就是基于 JVM 之外的,因此滿足多進程可見的要求。

2)互斥

即同一時間只能有一個進程獲取鎖標記,我們可以通過 redis 的 setnx 實現,只有第一次執行的才會成功并返回 1,其它情況返回 0。

Redis 分布式鎖有哪些

釋放鎖
釋放鎖其實只需要把鎖的 key 刪除即可,使用 del xxx 指令。不過,如果在我們執行 del 之前,服務突然宕機,那么鎖就永遠無法刪除了。所以我們可以通過 setex 命令設置過期時間即可。

import java.util.UUID;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;/**
 * 第一種分布式鎖 */@Componentpublic class RedisService {private final Logger log = LoggerFactory.getLogger(this.getClass());
 @Autowired
 JedisPool jedisPool; 
 // 獲取鎖之前的超時時間 (獲取鎖的等待重試時間)
 private long acquireTimeout = 5000; // 獲取鎖之后的超時時間 (防止死鎖)
 private int timeOut = 10000; 
 * 獲取分布式鎖
 * @return 鎖標識 */
 public boolean getRedisLock(String lockName,String val) {
 Jedis jedis = null; try {jedis = jedisPool.getResource(); // 1. 計算獲取鎖的時間
 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2. 嘗試獲取鎖
 while (System.currentTimeMillis() endTime) { // 3. 獲取鎖成功就設置過期時間
 if (jedis.setnx(lockName, val) == 1) {jedis.expire(lockName, timeOut/1000); return true;
 } catch (Exception e) {log.error(e.getMessage());
 } finally {returnResource(jedis);
 } return false;
 } /**
 * 釋放分布式鎖
 * @param lockName 鎖名稱 */
 public void unRedisLock(String lockName) {
 Jedis jedis = null; try {jedis = jedisPool.getResource(); // 釋放鎖 jedis.del(lockName);
 } catch (Exception e) {log.error(e.getMessage());
 } finally {returnResource(jedis);
 }// =============================================== 
 public String get(String key) {
 Jedis jedis = null;
 String value = null; try {jedis = jedisPool.getResource();
 value = jedis.get(key);
 log.info(value);
 } catch (Exception e) {log.error(e.getMessage());
 } finally {returnResource(jedis);
 } return value;
 public void set(String key, String value) {
 Jedis jedis = null; try {jedis = jedisPool.getResource();
 jedis.set(key, value);
 } catch (Exception e) {log.error(e.getMessage());
 } finally {returnResource(jedis);
 } /**
 * 關閉連接 */
 public void returnResource(Jedis jedis) {try { if(jedis!=null) jedis.close();} catch (Exception e) {}

上面的分布式鎖實現了,但是這時候還可能出現另外 2 個問題:
一:獲取鎖時
setnx 獲取鎖成功了,還沒來得及 setex 服務就宕機了,由于這種非原子性的操作,死鎖又發生了。其實 redis 提供了 nx 與 ex 連用的命令。

Redis 分布式鎖有哪些
二:釋放鎖時
1. 3 個進程:A 和 B 和 C,在執行任務,并爭搶鎖,此時 A 獲取了鎖,并設置自動過期時間為 10s
2. A 開始執行業務,因為某種原因,業務阻塞,耗時超過了 10 秒,此時鎖自動釋放了
3. B 恰好此時開始嘗試獲取鎖,因為鎖已經自動釋放,成功獲取鎖
4. A 此時業務執行完畢,執行釋放鎖邏輯(刪除 key),于是 B 的鎖被釋放了,而 B 其實還在執行業務
5. 此時進程 C 嘗試獲取鎖,也成功了,因為 A 把 B 的鎖刪除了。
問題出現了:B 和 C 同時獲取了鎖,違反了互斥性!如何解決這個問題呢?我們應該在刪除鎖之前,判斷這個鎖是否是自己設置的鎖,如果不是(例如自己 的鎖已經超時釋放),那么就不要刪除了。所以我們可以在 set 鎖時,存入當前線程的唯一標識!刪除鎖前,判斷下里面的值是不是與自己標識釋放一 致,如果不一致,說明不是自己的鎖,就不要刪除了。

/**
 * 第二種分布式鎖 */public class RedisTool { private static final String LOCK_SUCCESS = OK 
 private static final Long RELEASE_SUCCESS = 1L; /**
 * 嘗試獲取分布式鎖
 * @param jedis Redis 客戶端
 * @param lockKey 鎖
 * @param requestId 請求標識
 * @param expireTime 超期時間
 * @return 是否獲取成功 */
 public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {String result = jedis.set(lockKey, requestId, NX , PX , expireTime); if (LOCK_SUCCESS.equals(result)) {return true;} return false;
 } /**
 * 釋放分布式鎖
 * @param jedis Redis 客戶端
 * @param lockKey 鎖
 * @param requestId 請求標識
 * @return 是否釋放成功 */
 public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {if (jedis.get(lockKey).equals(requestId)) {System.out.println( 釋放鎖... + Thread.currentThread().getName() + ,identifierValue: + requestId);
 jedis.del(lockKey); return true;
 } return false;
}

按照上面方式實現分布式鎖之后,就可以輕松解決大部分問題了。網上很多博客也都是這么實現的,但是仍然有些場景是不滿足的,例如一個方法獲取到鎖之后,可能在方法內調這個方法此時就獲取不到鎖了。這個時候我們就需要把鎖改進成可重入式鎖了。

Redis 分布式鎖有哪些

3)重入鎖:

也叫做遞歸鎖,指的是在同一線程內,外層函數獲得鎖之后,內層遞歸函數仍然可以獲取到該鎖。換一種說法:同一個線程再次進入同步代碼時,可以使用自己已獲取到的鎖??芍厝腈i可以避免因同一線程中多次獲取鎖而導致死鎖發生。像 synchronized 就是一個重入鎖,它是通過 moniter 函數記錄當前線程信息來實現的。實現可重入鎖需要考慮兩點:
獲取鎖:首先嘗試獲取鎖,如果獲取失敗,判斷這個鎖是否是自己的,如果是則允許再次獲取,而且必須記錄重復獲取鎖的次數。
釋放鎖:釋放鎖不能直接刪除了,因為鎖是可重入的,如果鎖進入了多次,在內層直接刪除鎖,導致外部的業務在沒有鎖的情況下執行,會有安全問題。因此必須獲取鎖時累計重入的次數,釋放時則減去重入次數,如果減到 0,則可以刪除鎖。

 下面我們假設鎖的 key 為“lock”,hashKey 是當前線程的 id:“threadId”,鎖自動釋放時間假設為 20
獲取鎖的步驟:1、判斷 lock 是否存在 EXISTS lock 
 2、不存在,則自己獲取鎖,記錄重入層數為 1. 2、存在,說明有人獲取鎖了,下面判斷是不是自己的鎖, 即判斷當前線程 id 作為 hashKey 是否存在:HEXISTS lock threadId 
 3、不存在,說明鎖已經有了,且不是自己獲取的,鎖獲取失敗. 3、存在,說明是自己獲取的鎖,重入次數 +1:HINCRBY lock threadId 1,最后更新鎖自動釋放時間,EXPIRE lock 20
 釋放鎖的步驟:1、判斷當前線程 id 作為 hashKey 是否存在:HEXISTS lock threadId 
 2、不存在,說明鎖已經失效,不用管了 
 2、存在,說明鎖還在,重入次數減 1:HINCRBY lock threadId -1,3、獲取新的重入次數,判斷重入次數是否為 0,為 0 說明鎖全部釋放,刪除 key:DEL lock

因此,存儲在鎖中的信息就必須包含:key、線程標識、重入次數。不能再使用簡單的 key-value 結構,這里推薦使用 hash 結構。
獲取鎖的腳本 (注釋刪掉, 不然運行報錯)

local key = KEYS[1]; -- 第 1 個參數, 鎖的 keylocal threadId = ARGV[1]; -- 第 2 個參數, 線程唯一標識 local releaseTime = ARGV[2]; -- 第 3 個參數, 鎖的自動釋放時間 if(redis.call( exists , key) == 0) then -- 判斷鎖是否已存在
 redis.call( hset , key, threadId, 1 -- 不存在, 則獲取鎖
 redis.call(expire , key, releaseTime); -- 設置有效期
 return 1; -- 返回結果 end;if(redis.call( hexists , key, threadId) == 1) then -- 鎖已經存在,判斷 threadId 是否是自己 
 redis.call( hincrby , key, threadId, 1 -- 如果是自己,則重入次數 +1
 redis.call(expire , key, releaseTime); -- 設置有效期
 return 1; -- 返回結果 end;return 0; -- 代碼走到這里, 說明獲取鎖的不是自己,獲取鎖失敗 

釋放鎖的腳本 (注釋刪掉, 不然運行報錯)

local key = KEYS[1]; -- 第 1 個參數, 鎖的 keylocal threadId = ARGV[1]; -- 第 2 個參數, 線程唯一標識 if (redis.call( HEXISTS , key, threadId) == 0) then -- 判斷當前鎖是否還是被自己持有
 return nil; -- 如果已經不是自己,則直接返回 end;local count = redis.call(HINCRBY , key, threadId, -1); -- 是自己的鎖,則重入次數 -1if (count == 0) then -- 判斷是否重入次數是否已經為 0
 redis.call(DEL , key); -- 等于 0 說明可以釋放鎖,直接刪除
 return nil; 
end;

完整代碼

import java.util.Collections;import java.util.UUID;import org.springframework.core.io.ClassPathResource;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.data.redis.core.script.DefaultRedisScript;import org.springframework.scripting.support.ResourceScriptSource;/**
 * Redis 可重入鎖 */public class RedisLock {private static final StringRedisTemplate redisTemplate = SpringUtil.getBean(StringRedisTemplate.class); private static final DefaultRedisScript Long LOCK_SCRIPT; private static final DefaultRedisScript Object UNLOCK_SCRIPT; static { // 加載釋放鎖的腳本
 LOCK_SCRIPT = new DefaultRedisScript ();
 LOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource( lock.lua)));
 LOCK_SCRIPT.setResultType(Long.class); // 加載釋放鎖的腳本
 UNLOCK_SCRIPT = new DefaultRedisScript ();
 UNLOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource( unlock.lua)));
 } /**
 * 獲取鎖
 * @param lockName 鎖名稱
 * @param releaseTime 超時時間 (單位: 秒)
 * @return key 解鎖標識 */
 public static String tryLock(String lockName,String releaseTime) { // 存入的線程信息的前綴,防止與其它 JVM 中線程信息沖突
 String key = UUID.randomUUID().toString(); // 執行腳本
 Long result = redisTemplate.execute(
 LOCK_SCRIPT,
 Collections.singletonList(lockName),
 key + Thread.currentThread().getId(), releaseTime); // 判斷結果
 if(result != null result.intValue() == 1) {return key;}else {return null;} /**
 * 釋放鎖
 * @param lockName 鎖名稱
 * @param key 解鎖標識 */
 public static void unlock(String lockName,String key) { // 執行腳本 redisTemplate.execute(
 UNLOCK_SCRIPT,
 Collections.singletonList(lockName),
 key + Thread.currentThread().getId(), null);
}

感謝各位的閱讀!看完上述內容,你們對 Redis 分布式鎖有哪些大概了解了嗎?希望文章內容對大家有所幫助。如果想了解更多相關文章內容,歡迎關注丸趣 TV 行業資訊頻道。

向 AI 問一下細節

丸趣 TV 網 – 提供最優質的資源集合!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-12-18發表,共計6775字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 富源县| 二连浩特市| 双城市| 图片| 调兵山市| 宁明县| 拜泉县| 全州县| 正宁县| 天气| 梓潼县| 永泰县| 土默特左旗| 青川县| 云龙县| 云和县| 丁青县| 高雄市| 曲周县| 姚安县| 吉首市| 株洲县| 长白| 开封市| 万宁市| 山西省| 平顺县| 桓仁| 柳江县| 威远县| 天镇县| 蛟河市| 门头沟区| 红河县| 祁阳县| 东兰县| 定边县| 延川县| 株洲市| 许昌县| 三原县|