共計 5479 個字符,預計需要花費 14 分鐘才能閱讀完成。
本篇內容主要講解“ZooKeeper 同步框架怎么實現”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“ZooKeeper 同步框架怎么實現”吧!
首先,定義一個同步接口,它有一個 execute 方法,主要負責同步任務的實現。
Path 參數是任務節點 (用戶),只有相同的節點才會同步工作。想象一下,去銀行取錢,如果每個人都有一個專屬的柜臺,那效率是明顯的。
SynchronousProcessor 參數用來處理具體的業務。
Synchronous.java
package org.bigmouth.nvwa.zookeeper.concurrent;
* 同步,支持分布式
*
* @author Allen Hu
* 2015-4-17
*/
public interface Synchronous {
/**
* 同步執行,根據 path 標識來區分同步工作。不同的 path 將不會同步進行。 *
* @param 處理結果類型
* @param path 任務節點
* e.g. /project/synchronous/0000001
* @param processor 業務處理器
* @return 處理結果
*/T execute(String path, SynchronousProcessorprocessor);
}
MutexLockSynchronous.java
Synchronous 的實現類,基于普通排它鎖的方式實現。
package org.bigmouth.nvwa.zookeeper.concurrent;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.common.PathUtils;
import org.bigmouth.nvwa.zookeeper.ZkClientHolder;
* 基于普通排他鎖的方式實現同步
*
* @author Allen Hu
* 2015-4-17
*/
public class MutexLockSynchronous implements Synchronous {
private final ZkClientHolder zkClientHolder;
public MutexLockSynchronous(ZkClientHolder zkClientHolder) {
this.zkClientHolder = zkClientHolder;
}
@Override
publicT execute(String path, SynchronousProcessorprocessor) { PathUtils.validatePath(path);
InterProcessLock lock = new InterProcessMutex(zkClientHolder.get(), path);
try { lock.acquire();
if (null != processor)
return processor.process();
}
catch (Exception e) { if (null != processor)
processor.exceptionCaught(e);
}
finally {
try { lock.release();
}
catch (Exception e) { }
}
return null;
}
}
SynchronousProcessor.java
任務處理器接口,實現它來完成具體的業務工作
package org.bigmouth.nvwa.zookeeper.concurrent;
* 同步業務處理器
*
* @author Allen Hu
* 2015-4-17
*/
public interface SynchronousProcessor{
/**
* 處理具體的業務
*
* @return
*/
T process();
/**
* 異常捕獲
*
* @param throwable
*/
void exceptionCaught(Throwable throwable);
}
ZkClientHolder.java
當然少不了這個了,繼承的父類可以不需要了解,就是定義了兩個抽象方法:doInit 和 doDestroy 方法。
package org.bigmouth.nvwa.zookeeper;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.bigmouth.nvwa.utils.BaseLifeCycleSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
* ZooKeeper client holder
*
* @author Allen Hu
* 2015-4-16
*/
public class ZkClientHolder extends BaseLifeCycleSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(ZkClientHolder.class);
public static final int MAX_RETRIES = 3;
public static final int BASE_SLEEP_TIMEMS = 3000;
private CuratorFramework zkClient;
private final String connectString;
private final int sessionTimeout;
public ZkClientHolder(String connectString, int sessionTimeout) { Preconditions.checkArgument(StringUtils.isNotBlank(connectString), connectString cannot be blank
Preconditions.checkArgument(sessionTimeout = 10000, sessionTimeout must be greater than 10000
this.connectString = connectString;
this.sessionTimeout = sessionTimeout;
}
public CuratorFramework get() {
return zkClient;
}
@Override
protected void doInit() { zkClient = CuratorFrameworkFactory.builder()
.sessionTimeoutMs(sessionTimeout)
.connectString(connectString)
.retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIMEMS, MAX_RETRIES))
.build();
zkClient.start();
if (LOGGER.isInfoEnabled()) { LOGGER.info( Connected to ZooKepper server: {} , connectString);
}
}
@Override
protected void doDestroy() { if (null != zkClient)
zkClient.close();
}
}
最后來個測試類,模擬多個用戶多線程處理任務的過程,我們達到了相同用戶間同步的目的。
package org.bigmouth.nvwa.zookeeper.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.utils.ZKPaths;
import org.bigmouth.nvwa.zookeeper.ZkClientHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
* @author Allen Hu
* 2015-4-17
*/
public class ConcurrentTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentTest.class);
private ZkClientHolder zkClientHolder = new ZkClientHolder(172.16.3.24:2181 , 60000);
private Synchronous synchronous;
public ConcurrentTest() { zkClientHolder.init();
synchronous = new MutexLockSynchronous(zkClientHolder);
}
public class Service implements Runnable {
private final String id;
private final long sleepInMillis;
public Service(String id, long sleepInMillis) {
this.id = id;
this.sleepInMillis = sleepInMillis;
}
@Override
public void run() { synchronous.execute(ZKPaths.makePath( /nvwa/zookeeper/concurrent , id), new SynchronousProcessor() {
@Override
public String process() {
LOGGER.info(id + star...!
try { Thread.sleep(sleepInMillis);
}
catch (InterruptedException e) { e.printStackTrace();
}
LOGGER.info(id + has execution!
return id;
}
@Override
public void exceptionCaught(Throwable throwable) { throwable.printStackTrace();
}
});
}
}
static ExecutorService executor = Executors.newCachedThreadPool();
public static void main(String[] args) { ConcurrentTest ct = new ConcurrentTest();
executor.submit(ct.new Service( 1 , 5000)); // 1 號 處理 5 秒
executor.submit(ct.new Service( 1 , 2000)); // 1 號 處理 2 秒
executor.submit(ct.new Service( 2 , 5000)); // 2 號 處理 5 秒
executor.submit(ct.new Service( 3 , 10000)); // 3 號 處理 10 秒
executor.submit(ct.new Service( 3 , 500)); // 3 號 處理 0.5 秒
}
}
輸出結果,1、2、3 任務并行,而相同的任務串行。如:第二個 1 號等第一個 1 號執行完才開始。
到此,相信大家對“ZooKeeper 同步框架怎么實現”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!