久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Storm中如何進(jìn)行Librato的Metric度量的實(shí)現(xiàn)

共計(jì) 5512 個(gè)字符,預(yù)計(jì)需要花費(fèi) 14 分鐘才能閱讀完成。

Storm 中如何進(jìn)行 Librato 的 Metric 度量的實(shí)現(xiàn),很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面丸趣 TV 小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。

輻射性質(zhì)介紹一個(gè) Librato 的 Metric 度量的實(shí)現(xiàn)

package com.digitalpebble.storm.crawler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
import com.librato.metrics.HttpPoster;
import com.librato.metrics.HttpPoster.Response;
import com.librato.metrics.LibratoBatch;
import com.librato.metrics.NingHttpPoster;
import com.librato.metrics.Sanitizer;
import com.librato.metrics.Versions;
/** Sends the metrics to Librato **/
public class LibratoMetricsConsumer implements IMetricsConsumer {
 public static final int DEFAULT_BATCH_SIZE = 500;
 private static final Logger LOG = LoggerFactory
 .getLogger(LibratoMetricsConsumer.class);
 private static final String LIB_VERSION = Versions.getVersion(
 META-INF/maven/com.librato.metrics/librato-java/pom.properties ,
 LibratoBatch.class);
 private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 private final Sanitizer sanitizer = new Sanitizer() {public String apply(String name) {return Sanitizer.LAST_PASS.apply(name);
 private int postBatchSize = DEFAULT_BATCH_SIZE;
 private long timeout = 30;
 private final TimeUnit timeoutUnit = TimeUnit.SECONDS;
 private String userAgent = null;
 private HttpPoster httpPoster;
 private Set String  metricsToKeep = new HashSet String 
 public void prepare(Map stormConf, Object registrationArgument,
 TopologyContext context, IErrorReporter errorReporter) {
 // TODO configure timeouts
 // this.timeout = timeout;
 // this.timeoutUnit = timeoutUnit;
 // this.postBatchSize = postBatchSize;
 String agentIdentifier = (String) stormConf.get( librato.agent 
 if (agentIdentifier == null)
 agentIdentifier =  storm 
 String token = (String) stormConf.get( librato.token 
 String username = (String) stormConf.get( librato.username 
 String apiUrl = (String) stormConf.get( librato.api.url 
 if (apiUrl == null)
 apiUrl =  https://metrics-api.librato.com/v1/metrics 
 // check that the values are not null
 if (StringUtils.isBlank(token))
 throw new RuntimeException( librato.token not set 
 if (StringUtils.isBlank(username))
 throw new RuntimeException( librato.username not set 
 this.userAgent = String.format( %s librato-java/%s , agentIdentifier,
 LIB_VERSION);
 this.httpPoster = NingHttpPoster.newPoster(username, token, apiUrl);
 // get the list of metrics names to keep if any
 String metrics2keep = (String) stormConf.get( librato.metrics.to.keep 
 if (metrics2keep != null) {String[] mets = metrics2keep.split( , 
 for (String m : mets)
 metricsToKeep.add(m.trim().toLowerCase());
 // post(String source, long epoch)
 public void handleDataPoints(TaskInfo taskInfo,
 Collection DataPoint  dataPoints) {
 final Map String, Object  payloadMap = new HashMap String, Object 
 payloadMap.put( source , taskInfo.srcComponentId +  _ 
 + taskInfo.srcWorkerHost +  _  + taskInfo.srcTaskId);
 payloadMap.put(measure_time , taskInfo.timestamp);
 final List Map String, Object  gaugeData = new ArrayList Map String, Object ();
 final List Map String, Object  counterData = new ArrayList Map String, Object ();
 int counter = 0;
 final Iterator DataPoint  datapointsIterator = dataPoints.iterator();
 while (datapointsIterator.hasNext()) {final DataPoint dataPoint = datapointsIterator.next();
 // ignore datapoint with a value which is not a map
 if (!(dataPoint.value instanceof Map))
 continue;
 // a counter or a gauge
 // convention if its name contains  _counter 
 // then treat it as a counter
 boolean isCounter = false;
 if (dataPoint.name.contains( _counter)) {
 isCounter = true;
 dataPoint.name = dataPoint.name.replaceFirst( _counter ,  
 if (!metricsToKeep.isEmpty()) {if (!metricsToKeep.contains(dataPoint.name.toLowerCase())) {
 continue;
 try {Map String, Number  metric = (Map String, Number) dataPoint.value;
 for (Map.Entry String, Number  entry : metric.entrySet()) {String metricId = entry.getKey();
 Number val = entry.getValue();
 final Map String, Object  data = new HashMap String, Object 
 data.put( name ,
 sanitizer.apply(dataPoint.name +  _  + metricId));
 data.put(value , val);
 if (isCounter)
 counterData.add(data);
 else
 // use as gauge
 gaugeData.add(data);
 counter++;
 if (counter % postBatchSize == 0
 || (!datapointsIterator.hasNext()   (!counterData
 .isEmpty() || !gaugeData.isEmpty()))) {
 final String countersKey =  counters 
 final String gaugesKey =  gauges 
 payloadMap.put(countersKey, counterData);
 payloadMap.put(gaugesKey, gaugeData);
 postPortion(payloadMap);
 payloadMap.remove(gaugesKey);
 payloadMap.remove(countersKey);
 gaugeData.clear();
 counterData.clear();} catch (RuntimeException e) {LOG.error(e.getMessage());
 LOG.debug(Posted {} measurements , counter);
 public void cleanup() {private void postPortion(Map String, Object  chunk) {
 try {final String payload = OBJECT_MAPPER.writeValueAsString(chunk);
 final Future Response  future = httpPoster.post(userAgent, payload);
 final Response response = future.get(timeout, timeoutUnit);
 final int statusCode = response.getStatusCode();
 if (statusCode   200 || statusCode  = 300) {
 LOG.error(Received an error from Librato API. Code : {}, Message: {} ,
 statusCode, response.getBody());
 } catch (Exception e) {LOG.error( Unable to post to Librato API , e);
}

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注丸趣 TV 行業(yè)資訊頻道,感謝您對(duì)丸趣 TV 的支持。

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計(jì)5512字。
轉(zhuǎn)載說(shuō)明:除特殊說(shuō)明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請(qǐng)注明出處。
評(píng)論(沒(méi)有評(píng)論)
主站蜘蛛池模板: 石首市| 德保县| 杭州市| 信宜市| 洮南市| 新疆| 罗江县| 江山市| 宁远县| 南投县| 呼玛县| 休宁县| 明水县| 潼关县| 深州市| 浮山县| 白朗县| 若羌县| 崇信县| 边坝县| 西安市| 防城港市| 射洪县| 抚顺市| 金阳县| 南漳县| 南皮县| 梁河县| 安泽县| 龙泉市| 汉沽区| 临邑县| 洛川县| 澄江县| 万全县| 乐清市| 泉州市| 天台县| 南投市| 田东县| 长兴县|