久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

zookeeper分布式鎖實現的方法是什么

168次閱讀
沒有評論

共計 8849 個字符,預計需要花費 23 分鐘才能閱讀完成。

本篇內容介紹了“zookeeper 分布式鎖實現的方法是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

一。為何使用分布式鎖?
當應用服務器數量超過 1 臺,對相同數據的訪問可能造成訪問沖突(特別是寫沖突)。單純使用關系數據庫比如 MYSQL 的應用可以借助于事務來實現鎖,也可以使用版本號等實現樂觀鎖,最大的缺陷就是可用性降低(性能差)。對于 GLEASY 這種滿足大規模并發訪問請求的應用來說,使用數據庫事務來實現數據庫就有些捉襟見肘了。另外對于一些不依賴數據庫的應用,比如分布式文件系統,為了保證同一文件在大量讀寫操作情況下的正確性,必須引入分布式鎖來約束對同一文件的并發操作。

二。對分布式鎖的要求
1. 高性能(分布式鎖不能成為系統的性能瓶頸)
2. 避免死鎖(拿到鎖的結點掛掉不會導致其它結點永遠無法繼續)
3. 支持鎖重入

三。方案 1,基于 zookeeper 的分布式鎖

/**
* DistributedLockUtil.java
*  分布式鎖工廠類,所有分布式請求都由該工廠類負責
public class DistributedLockUtil {private static Object schemeLock = new Object();
 private static Object mutexLock = new Object();
 private static Map String,Object  mutexLockMap = new ConcurrentHashMap(); 
 private String schema;
 private Map String,DistributedReentrantLock  cache = new ConcurrentHashMap String,DistributedReentrantLock 
 private static Map String,DistributedLockUtil  instances = new ConcurrentHashMap();
 public static DistributedLockUtil getInstance(String schema){DistributedLockUtil u = instances.get(schema);
 if(u==null){synchronized(schemeLock){u = instances.get(schema);
 if(u == null){u = new DistributedLockUtil(schema);
 instances.put(schema, u);
 return u;
 private DistributedLockUtil(String schema){
 this.schema = schema;
 private Object getMutex(String key){Object mx = mutexLockMap.get(key);
 if(mx == null){synchronized(mutexLock){mx = mutexLockMap.get(key);
 if(mx==null){mx = new Object();
 mutexLockMap.put(key,mx);
 return mx;
 private DistributedReentrantLock getLock(String key){DistributedReentrantLock lock = cache.get(key);
 if(lock == null){synchronized(getMutex(key)){lock = cache.get(key);
 if(lock == null){lock = new DistributedReentrantLock(key,schema);
 cache.put(key, lock);
 return lock;
 public void reset(){for(String s : cache.keySet()){getLock(s).unlock();
  *  嘗試加鎖
  *  如果當前線程已經擁有該鎖的話, 直接返回 false, 表示不用再次加鎖, 此時不應該再調用 unlock 進行解鎖
  * 
  * @param key
  * @return
  * @throws InterruptedException
  * @throws KeeperException
  */
 public LockStat lock(String key) throws InterruptedException, KeeperException{if(getLock(key).isOwner()){
 return LockStat.NONEED;
 getLock(key).lock();
 return LockStat.SUCCESS;
 public void clearLock(String key) throws InterruptedException, KeeperException{synchronized(getMutex(key)){DistributedReentrantLock l = cache.get(key);
 l.clear();
 cache.remove(key);
 public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{unlock(key,stat,false);
 public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{if(stat == null) return;
 if(LockStat.SUCCESS.equals(stat)){DistributedReentrantLock lock = getLock(key);
 boolean hasWaiter = lock.unlock();
 if(!hasWaiter   !keepalive){synchronized(getMutex(key)){lock.clear();
 cache.remove(key);
 public static enum LockStat{
 NONEED,
 SUCCESS
}

/**
*DistributedReentrantLock.java
* 本地線程之間鎖爭用,先使用虛擬機內部鎖機制,減少結點間通信開銷
public class DistributedReentrantLock {private static final Logger logger = Logger.getLogger(DistributedReentrantLock.class);
 private ReentrantLock reentrantLock = new ReentrantLock();
 private WriteLock writeLock;
 private long timeout = 3*60*1000;
 
 private final Object mutex = new Object();
 private String dir;
 private String schema;
 
 private final ExitListener exitListener = new ExitListener(){
 @Override
 public void execute() {initWriteLock();
 private synchronized void initWriteLock(){
 logger.debug( 初始化 writeLock 
 writeLock = new WriteLock(dir,new LockListener(){
 @Override
 public void lockAcquired() {synchronized(mutex){mutex.notify();
 @Override
 public void lockReleased() {
  
  },schema);
 if(writeLock != null   writeLock.zk != null){writeLock.zk.addExitListener(exitListener);
 synchronized(mutex){mutex.notify();
 public DistributedReentrantLock(String dir,String schema) { 
  this.dir = dir;
  this.schema = schema;
  initWriteLock();
 }
 public void lock(long timeout) throws InterruptedException, KeeperException { reentrantLock.lock();// 多線程競爭時,先拿到第一層鎖
 try{  boolean res = writeLock.trylock();
  if(!res){  synchronized(mutex){mutex.wait(timeout);
   if(writeLock == null || !writeLock.isOwner()){
   throw new InterruptedException( 鎖超時 
   }
  }
 }catch(InterruptedException e){  reentrantLock.unlock();
  throw e;
 }catch(KeeperException e){  reentrantLock.unlock();
  throw e;
 }
 }
 
 public void lock() throws InterruptedException, KeeperException {  lock(timeout);
 }
 public void destroy() throws KeeperException {  writeLock.unlock();
 }
 
 public boolean unlock(){  if(!isOwner()) return false;
 try{  writeLock.unlock();
  reentrantLock.unlock();// 多線程競爭時,釋放最外層鎖
 }catch(RuntimeException e){  reentrantLock.unlock();// 多線程競爭時,釋放最外層鎖
  throw e;
 }
 
 return reentrantLock.hasQueuedThreads();
 }

 public boolean isOwner() { return reentrantLock.isHeldByCurrentThread()   writeLock.isOwner();  } public void clear() {writeLock.clear(); }

/**
*WriteLock.java
* 基于 zk 的鎖實現
* 一個最簡單的場景如下:*1. 結點 A 請求加鎖,在特定路徑下注冊自己(會話自增結點 ),得到一個 ID 號 1
*2. 結點 B 請求加鎖,在特定路徑下注冊自己(會話自增結點 ),得到一個 ID 號 2
*3. 結點 A 獲取所有結點 ID,判斷出來自己是最小結點號,于是獲得鎖
*4. 結點 B 獲取所有結點 ID,判斷出來自己不是最小結點,于是監聽小于自己的最大結點(結點 A)變更事件
*5. 結點 A 拿到鎖,處理業務,處理完,釋放鎖(刪除自己)*6. 結點 B 收到結點 A 變更事件,判斷出來自己已經是最小結點號,于是獲得鎖。public class WriteLock extends ZkPrimative { private static final Logger LOG = Logger.getLogger(WriteLock.class);
 private final String dir;
 private String id;
 private LockNode idName;
 private String ownerId;
 private String lastChildId;
 private byte[] data = {0x12, 0x34};
 private LockListener callback;
 
 public WriteLock(String dir,String schema) { super(schema,true);
 this.dir = dir;
 }
 
 public WriteLock(String dir,LockListener callback,String schema) {  this(dir,schema);
 this.callback = callback;
 }
 public LockListener getLockListener() {
 return this.callback;
 }
 
 public void setLockListener(LockListener callback) {
 this.callback = callback;
 }
 public synchronized void unlock() throws RuntimeException {  if(zk == null || zk.isClosed()){
  return;
  }
 if (id != null) {
 try {   zk.delete(id, -1); 
 } catch (InterruptedException e) { LOG.warn( Caught:   + e, e);
 //set that we have been interrupted.
 Thread.currentThread().interrupt();
 } catch (KeeperException.NoNodeException e) {
 // do nothing
 } catch (KeeperException e) { LOG.warn( Caught:   + e, e);
 throw (RuntimeException) new RuntimeException(e.getMessage()).
 initCause(e);
 }finally { if (callback != null) { callback.lockReleased();
 }
 id = null;
 }
 }
 }
 
 private class LockWatcher implements Watcher { public void process(WatchedEvent event) { LOG.debug( Watcher fired on path:   + event.getPath() +   state:   + 
 event.getState() +   type   + event.getType());
 try { trylock();
 } catch (Exception e) { LOG.warn( Failed to acquire lock:   + e, e);
 }
 }
 }
 
 private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) 
 throws KeeperException, InterruptedException { List String  names = zookeeper.getChildren(dir, false);
 for (String name : names) { if (name.startsWith(prefix)) {
 id = dir +  /  + name;
 if (LOG.isDebugEnabled()) { LOG.debug( Found id created last time:   + id);
 }
 break;
 }
 }
 if (id == null) {
 id = zookeeper.create(dir +  /  + prefix, data, 
 acl, EPHEMERAL_SEQUENTIAL);
 if (LOG.isDebugEnabled()) { LOG.debug( Created id:   + id);
 }
 }
 }
 public void clear() {if(zk == null || zk.isClosed()){
  return;
  }
 try {zk.delete(dir, -1);
 } catch (Exception e) { LOG.error( clear error:   + e,e);
 } 
 public synchronized boolean trylock() throws KeeperException, InterruptedException {  if(zk == null){
  LOG.info( zk  是空 
  return false;
  }
 if (zk.isClosed()) {
  LOG.info( zk  已經關閉 
 return false;
 }
 ensurePathExists(dir);
 
 LOG.debug(id: +id);
 do { if (id == null) { long sessionId = zk.getSessionId();
 String prefix =  x-  + sessionId +  - 
 idName = new LockNode(id);
 LOG.debug(idName: +idName);
 }
 if (id != null) { List String  names = zk.getChildren(dir, false);
 if (names.isEmpty()) {
 LOG.warn( No children in:   + dir +   when we ve just   +
  created one! Lets recreate it... 
 id = null;
 } else {
 SortedSet LockNode  sortedNames = new TreeSet LockNode 
 for (String name : names) { sortedNames.add(new LockNode(dir +  /  + name));
 }
 ownerId = sortedNames.first().getName();
 LOG.debug(all: +sortedNames);
 SortedSet LockNode  lessThanMe = sortedNames.headSet(idName);
 LOG.debug(less than me: +lessThanMe);
 if (!lessThanMe.isEmpty()) {  LockNode lastChildName = lessThanMe.last();
 lastChildId = lastChildName.getName();
 if (LOG.isDebugEnabled()) { LOG.debug( watching less than me node:   + lastChildId);
 }
 Stat stat = zk.exists(lastChildId, new LockWatcher());
 if (stat != null) {
 return Boolean.FALSE;
 } else {
 LOG.warn( Could not find the  +
   stats for less than me:   + lastChildName.getName());
 }
 } else { if (isOwner()) { if (callback != null) { callback.lockAcquired();
 }
 return Boolean.TRUE;
 }
 }
 }
 }
 }
 while (id == null);
 return Boolean.FALSE;
 }
 public String getDir() {
 return dir;
 }
 public boolean isOwner() { return id != null   ownerId != null   id.equals(ownerId);
 }
 public String getId() {
 return this.id;
 }
}

使用本方案實現的分布式鎖,可以很好地解決鎖重入的問題,而且使用會話結點來避免死鎖;性能方面,根據筆者自測結果,加鎖解鎖各一次算是一個操作,本方案實現的分布式鎖,TPS 大概為 2000-3000,性能比較一般

“zookeeper 分布式鎖實現的方法是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計8849字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 华阴市| 蒙山县| 潜山县| 建阳市| 太仓市| 常宁市| 堆龙德庆县| 尼勒克县| 安国市| 曲松县| 弥渡县| 湘西| 齐齐哈尔市| 三穗县| 蒲江县| 宜春市| 台州市| 太白县| 九寨沟县| 荔波县| 怀来县| 洛扎县| 肇州县| 遵义市| 循化| 通江县| 桦南县| 通榆县| 敦煌市| 承德市| 正蓝旗| 肥东县| 冷水江市| 城固县| 武鸣县| 盈江县| 渝中区| 马龙县| 甘南县| 秦皇岛市| 天长市|