久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

ZooKeeper同步框架怎么實現

158次閱讀
沒有評論

共計 5479 個字符,預計需要花費 14 分鐘才能閱讀完成。

本篇內容主要講解“ZooKeeper 同步框架怎么實現”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“ZooKeeper 同步框架怎么實現”吧!

首先,定義一個同步接口,它有一個 execute 方法,主要負責同步任務的實現。

Path 參數是任務節點 (用戶),只有相同的節點才會同步工作。想象一下,去銀行取錢,如果每個人都有一個專屬的柜臺,那效率是明顯的。

SynchronousProcessor 參數用來處理具體的業務。

Synchronous.java

package org.bigmouth.nvwa.zookeeper.concurrent;
 
 
 *  同步,支持分布式
 * 
 * @author Allen Hu 
 * 2015-4-17
 */
public interface Synchronous {
 
 /**
 *  同步執行,根據 path 標識來區分同步工作。不同的 path 將不會同步進行。 * 
 * @param 處理結果類型
 * @param path  任務節點
 * e.g.  /project/synchronous/0000001 
 * @param processor  業務處理器
 * @return  處理結果
 */T execute(String path, SynchronousProcessorprocessor);
}

MutexLockSynchronous.java

Synchronous 的實現類,基于普通排它鎖的方式實現。

package org.bigmouth.nvwa.zookeeper.concurrent;
 
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.common.PathUtils;
import org.bigmouth.nvwa.zookeeper.ZkClientHolder;
 
 
 *  基于普通排他鎖的方式實現同步
 * 
 * @author Allen Hu 
 * 2015-4-17
 */
public class MutexLockSynchronous implements Synchronous {
 
 private final ZkClientHolder zkClientHolder;
 
 public MutexLockSynchronous(ZkClientHolder zkClientHolder) {
 this.zkClientHolder = zkClientHolder;
 }
 
 @Override
 publicT execute(String path, SynchronousProcessorprocessor) { PathUtils.validatePath(path);
 InterProcessLock lock = new InterProcessMutex(zkClientHolder.get(), path);
 try { lock.acquire();
 if (null != processor)
 return processor.process();
 }
 catch (Exception e) { if (null != processor)
 processor.exceptionCaught(e);
 }
 finally {
 try { lock.release();
 }
 catch (Exception e) { }
 }
 return null;
 }
}

SynchronousProcessor.java

任務處理器接口,實現它來完成具體的業務工作

package org.bigmouth.nvwa.zookeeper.concurrent;
 
 
 *  同步業務處理器
 * 
 * @author Allen Hu 
 * 2015-4-17
 */
public interface SynchronousProcessor{
 
 /**
 *  處理具體的業務
 * 
 * @return
 */
 T process();
 
 /**
 *  異常捕獲
 * 
 * @param throwable
 */
 void exceptionCaught(Throwable throwable);
}

ZkClientHolder.java

當然少不了這個了,繼承的父類可以不需要了解,就是定義了兩個抽象方法:doInit 和 doDestroy 方法。

package org.bigmouth.nvwa.zookeeper;
 
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.bigmouth.nvwa.utils.BaseLifeCycleSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import com.google.common.base.Preconditions;
 
 
 * ZooKeeper client holder
 * 
 * @author Allen Hu 
 * 2015-4-16
 */
public class ZkClientHolder extends BaseLifeCycleSupport {
 
 private static final Logger LOGGER = LoggerFactory.getLogger(ZkClientHolder.class);
 
 public static final int MAX_RETRIES = 3;
 public static final int BASE_SLEEP_TIMEMS = 3000;
 
 private CuratorFramework zkClient;
 
 private final String connectString;
 private final int sessionTimeout;
 
 public ZkClientHolder(String connectString, int sessionTimeout) { Preconditions.checkArgument(StringUtils.isNotBlank(connectString),  connectString cannot be blank 
 Preconditions.checkArgument(sessionTimeout  = 10000,  sessionTimeout must be greater than 10000 
 this.connectString = connectString;
 this.sessionTimeout = sessionTimeout;
 }
 
 public CuratorFramework get() {
 return zkClient;
 }
 
 @Override
 protected void doInit() { zkClient = CuratorFrameworkFactory.builder()
 .sessionTimeoutMs(sessionTimeout)
 .connectString(connectString)
 .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIMEMS, MAX_RETRIES))
 .build();
 zkClient.start();
 if (LOGGER.isInfoEnabled()) { LOGGER.info( Connected to ZooKepper server: {} , connectString);
 }
 }
 
 @Override
 protected void doDestroy() { if (null != zkClient)
 zkClient.close();
 }
}

最后來個測試類,模擬多個用戶多線程處理任務的過程,我們達到了相同用戶間同步的目的。

package org.bigmouth.nvwa.zookeeper.concurrent;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import org.apache.curator.utils.ZKPaths;
import org.bigmouth.nvwa.zookeeper.ZkClientHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
 
 * 
 * @author Allen Hu 
 * 2015-4-17
 */
public class ConcurrentTest {
 
 private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentTest.class);
 private ZkClientHolder zkClientHolder = new ZkClientHolder(172.16.3.24:2181 , 60000);
 private Synchronous synchronous;
 
 public ConcurrentTest() { zkClientHolder.init();
 synchronous = new MutexLockSynchronous(zkClientHolder);
 }
 
 public class Service implements Runnable {
 
 private final String id;
 private final long sleepInMillis;
 
 public Service(String id, long sleepInMillis) {
 this.id = id;
 this.sleepInMillis = sleepInMillis;
 }
 
 @Override
 public void run() { synchronous.execute(ZKPaths.makePath( /nvwa/zookeeper/concurrent , id), new SynchronousProcessor() {
 
 @Override
 public String process() {
 LOGGER.info(id +   star...! 
 try { Thread.sleep(sleepInMillis);
 }
 catch (InterruptedException e) { e.printStackTrace();
 }
 LOGGER.info(id +   has execution! 
 return id;
 }
 
 @Override
 public void exceptionCaught(Throwable throwable) { throwable.printStackTrace();
 }
 });
 }
 }
 
 static ExecutorService executor = Executors.newCachedThreadPool();
 
 public static void main(String[] args) { ConcurrentTest ct = new ConcurrentTest();
 executor.submit(ct.new Service( 1 , 5000)); // 1 號   處理 5 秒
 executor.submit(ct.new Service( 1 , 2000)); // 1 號   處理 2 秒
 executor.submit(ct.new Service( 2 , 5000)); // 2 號   處理 5 秒
 executor.submit(ct.new Service( 3 , 10000)); // 3 號   處理 10 秒
 executor.submit(ct.new Service( 3 , 500)); // 3 號   處理 0.5 秒
 }
}

輸出結果,1、2、3 任務并行,而相同的任務串行。如:第二個 1 號等第一個 1 號執行完才開始。

到此,相信大家對“ZooKeeper 同步框架怎么實現”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計5479字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 宣武区| 辽中县| 织金县| 恩平市| 宣威市| 富宁县| 黔江区| 贵州省| 北碚区| 循化| 新平| 冀州市| 盐边县| 洪洞县| 陇南市| 大英县| 恭城| 安新县| 赤城县| 汉中市| 芦山县| 威信县| 武平县| 兖州市| 澄城县| 蒙城县| 冀州市| 东兰县| 潜山县| 延川县| 河南省| 车致| 梁山县| 揭西县| 清水河县| 疏勒县| 马公市| 高密市| 措勤县| 茂名市| 介休市|