久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

hbase0.98.9中如何實現endpoints

185次閱讀
沒有評論

共計 6216 個字符,預計需要花費 16 分鐘才能閱讀完成。

本篇文章為大家展示了 hbase0.98.9 中如何實現 endpoints,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

定制一個 endpoint 的過程。

下面是實現過程:

1、定義接口描述文件(該功能有 protobuf 提供出來)

option java_package =  coprocessor.endpoints.generated 
option java_outer_classname =  RowCounterEndpointProtos 
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message CountRequest {
message CountResponse { required int64 count = 1 [default = 0];
service RowCountService { rpc getRowCount(CountRequest)
 returns (CountResponse);
 rpc getKeyValueCount(CountRequest)
 returns (CountResponse);
}

這個文件我直接拿的 hbase 提供的 example 中的例子。其中的語法應該有過類似經驗的一看就清楚了,實在不清楚就請查查 protobuf 的幫助手冊吧。

2、根據接口描述文件生成 java 接口類(該功能有 protobuf 提供出來)

有了接口描述文件,還需要生成 java 語言的接口類。這個需要借助 protobuf 提供的工具 protoc。

$protoc --java_out=./ Examples.proto

簡單解釋下,protoc 這個命令在你裝了 protobuf 后就有了。Examples.proto 這個是文件名,也就是剛才編寫的那個接口描述文件。“–java_out”這個用來指定生成后的 java 類放的地方。

所以,這地方如果你沒有裝 protobuf,你需要裝一個,window 和 linux 版都有,多說一句,如果你去裝 hadoop64 位的編譯環境的話,應該是要裝 protobuf。

3、實現接口

package coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;
public class RowCounterEndpointExample extends RowCountService implements
 Coprocessor, CoprocessorService {
 private RegionCoprocessorEnvironment env;
 public RowCounterEndpointExample() {
 @Override
 public Service getService() {
 return this;
 @Override
 public void getRowCount(RpcController controller, CountRequest request,
 RpcCallback CountResponse  done) {Scan scan = new Scan();
 scan.setFilter(new FirstKeyOnlyFilter());
 CountResponse response = null;
 InternalScanner scanner = null;
 try {scanner = env.getRegion().getScanner(scan);
 List Cell  results = new ArrayList Cell 
 boolean hasMore = false;
 byte[] lastRow = null;
 long count = 0;
 do {hasMore = scanner.next(results);
 for (Cell kv : results) {byte[] currentRow = CellUtil.cloneRow(kv);
 if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
 lastRow = currentRow;
 count++;
 results.clear();} while (hasMore);
 response = CountResponse.newBuilder().setCount(count).build();} catch (IOException ioe) {ResponseConverter.setControllerException(controller, ioe);
 } finally {if (scanner != null) {
 try {scanner.close();
 } catch (IOException ignored) {done.run(response);
 @Override
 public void getKeyValueCount(RpcController controller,
 CountRequest request, RpcCallback CountResponse  done) {
 CountResponse response = null;
 InternalScanner scanner = null;
 try {scanner = env.getRegion().getScanner(new Scan());
 List Cell  results = new ArrayList Cell 
 boolean hasMore = false;
 long count = 0;
 do {hasMore = scanner.next(results);
 for (Cell kv : results) {
 count++;
 results.clear();} while (hasMore);
 response = CountResponse.newBuilder().setCount(count).build();} catch (IOException ioe) {ResponseConverter.setControllerException(controller, ioe);
 } finally {if (scanner != null) {
 try {scanner.close();
 } catch (IOException ignored) {done.run(response);
 @Override
 public void start(CoprocessorEnvironment env) throws IOException {if (env instanceof RegionCoprocessorEnvironment) {this.env = (RegionCoprocessorEnvironment) env;
 } else {
 throw new CoprocessorException( Must be loaded on a table region! 
 @Override
 public void stop(CoprocessorEnvironment env) throws IOException {// TODO Auto-generated method stub}

4、注冊接口(Hbase 功能,通過配置文件或者表模式方式注冊)

這部分,可以看 hbase 權威指南了,我就看這部分做的。

5、測試調用

package coprocessor;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.ServiceException;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;
import util.HBaseHelper;
public class RowCounterEndpointClientExample {public static void main(String[] args) throws ServiceException, Throwable {Configuration conf = HBaseConfiguration.create();
 HBaseHelper helper = HBaseHelper.getHelper(conf);
 //helper.dropTable( testtable 
 //helper.createTable( testtable ,  colfam1 ,  colfam2 
 System.out.println( Adding rows to table... 
 helper.fillTable( testtable , 1, 10, 10,  colfam1 ,  colfam2 
 HTable table = new HTable(conf,  testtable 
 final CountRequest request = CountRequest.getDefaultInstance();
 final Batch.Call RowCountService, Long  call =new Batch.Call RowCountService, Long () {public Long call(RowCountService counter)
 throws IOException {ServerRpcController controller = new ServerRpcController();
 BlockingRpcCallback CountResponse  rpcCallback = new BlockingRpcCallback CountResponse 
 counter.getRowCount(controller, request, rpcCallback);
 CountResponse response = rpcCallback.get();
 if (controller.failedOnException()) {throw controller.getFailedOn();
 return (response != null   response.hasCount()) ? response
 .getCount() : 0;
 Map byte[], Long  results = table.coprocessorService(RowCountService.class, null, null, call);
 for(byte[] b : results.keySet()){System.err.println(Bytes.toString(b) +  :  + results.get(b));
 } 
}

上述內容就是 hbase0.98.9 中如何實現 endpoints,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注丸趣 TV 行業資訊頻道。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-25發表,共計6216字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 鸡东县| 平遥县| 望城县| 南昌县| 白玉县| 都匀市| 邵东县| 鸡泽县| 上杭县| 北海市| 龙泉市| 太保市| 黑龙江省| 北辰区| 克山县| 巧家县| 石首市| 阿荣旗| 遂平县| 夏邑县| 北海市| 东丽区| 杭锦后旗| 喀喇沁旗| 惠州市| 元江| 昭觉县| 河曲县| 石家庄市| 富蕴县| 开江县| 抚州市| 布尔津县| 乐业县| 保山市| 屏南县| 安泽县| 孝昌县| 浙江省| 阳东县| 新兴县|