久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

kafka的low

244次閱讀
沒有評論

共計 6776 個字符,預計需要花費 17 分鐘才能閱讀完成。

這篇文章主要介紹“kafka 的 low-level consumer 怎么使用”,在日常操作中,相信很多人在 kafka 的 low-level consumer 怎么使用問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”kafka 的 low-level consumer 怎么使用”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

一、什么時候用這個接口?

     1)Read a message multiple times

     2)Consume only a subset of the partitions in a topic in a process

    3)Manage transactions to make sure a message is processed once and only once

二、使用 SimpleConsumer 的步驟:

1)Find an active Broker and find out which Broker is the leader for your topic and partition

2)Determine who the replica Brokers are for your topic and partition

3)Build the request defining what data you are interested in

4)Fetch the data ,Identify and recover from leader changes

首先,你必須知道讀哪個 topic 的哪個 partition 

然后,找到負責該 partition 的 broker leader,從而找到存有該 partition 副本的那個 broker 

再者,自己去寫 request 并 fetch 數據  

最終,還要注意需要識別和處理 broker leader 的改變

三、代碼如下:

package kafkatest.kakfademo;

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.cluster.BrokerEndPoint;

import kafka.common.ErrorMapping;

import kafka.common.TopicAndPartition;

import kafka.javaapi.FetchResponse;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.message.MessageAndOffset;

public class SimpleExample {

public static void main(String args[]) {

SimpleExample example = new SimpleExample();

long maxReads = 10;

String topicName = test

int partition = 0;

List String seeds = new ArrayList String

seeds.add(hadoop0

int port = 9092;

try {

example.run(maxReads, topicName, partition, seeds, port);

} catch (Exception e) {

System.out.println(Oops: + e);

e.printStackTrace();

}

}

private List String m_replicaBrokers = new ArrayList String

public SimpleExample() {

m_replicaBrokers = new ArrayList String

}

public void run(long a_maxReads, String a_topic, int a_partition,

List String a_seedBrokers, int a_port) throws Exception {

// find the meta data about the topic and partition we are interested in

//

PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,

a_partition);

if (metadata == null) {

System.out

.println(Can t find metadata for Topic and Partition. Exiting

return;

}

if (metadata.leader() == null) {

System.out

.println(Can t find Leader for Topic and Partition. Exiting

return;

}

String leadBroker = metadata.leader().host();

String clientName = Client_ + a_topic + _ + a_partition;

SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,

100000, 64 * 1024, clientName);

long readOffset = getLastOffset(consumer, a_topic, a_partition,

kafka.api.OffsetRequest.EarliestTime(), clientName);

int numErrors = 0;

while (a_maxReads 0) {

if (consumer == null) {

consumer = new SimpleConsumer(leadBroker, a_port, 100000,

64 * 1024, clientName);

}

// Note: this fetchSize of 100000 might need to be increased if

// large batches are written to Kafka

FetchRequest req = new FetchRequestBuilder().clientId(clientName)

.addFetch(a_topic, a_partition, readOffset, 100000).build();

FetchResponse fetchResponse = consumer.fetch(req);

if (fetchResponse.hasError()) {

numErrors++;

// Something went wrong!

short code = fetchResponse.errorCode(a_topic, a_partition);

System.out.println(Error fetching data from the Broker:

+ leadBroker + Reason: + code);

if (numErrors 5)

break;

if (code == ErrorMapping.OffsetOutOfRangeCode()) {

// We asked for an invalid offset. For simple case ask for

// the last element to reset

readOffset = getLastOffset(consumer, a_topic, a_partition,

kafka.api.OffsetRequest.LatestTime(), clientName);

continue;

}

consumer.close();

consumer = null;

leadBroker = findNewLeader(leadBroker, a_topic, a_partition,

a_port);

continue;

}

numErrors = 0;

long numRead = 0;

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(

a_topic, a_partition)) {

long currentOffset = messageAndOffset.offset();

if (currentOffset readOffset) {

System.out.println(Found an old offset: + currentOffset

+ Expecting: + readOffset);

continue;

}

readOffset = messageAndOffset.nextOffset();

ByteBuffer payload = messageAndOffset.message().payload();

byte[] bytes = new byte[payload.limit()];

payload.get(bytes);

System.out.println(String.valueOf(messageAndOffset.offset())

+ : + new String(bytes, UTF-8));

numRead++;

a_maxReads–;

}

if (numRead == 0) {

try {

Thread.sleep(1000);

} catch (InterruptedException ie) {

}

}

}

if (consumer != null)

consumer.close();

}

public static long getLastOffset(SimpleConsumer consumer, String topic,

int partition, long whichTime, String clientName) {

TopicAndPartition topicAndPartition = new TopicAndPartition(topic,

partition);

Map TopicAndPartition, PartitionOffsetRequestInfo requestInfo = new HashMap TopicAndPartition, PartitionOffsetRequestInfo

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(

whichTime, 1));

kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(

requestInfo, kafka.api.OffsetRequest.CurrentVersion(),

clientName);

OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {

System.out

.println(Error fetching data Offset Data the Broker. Reason:

+ response.errorCode(topic, partition));

return 0;

}

long[] offsets = response.offsets(topic, partition);

return offsets[0];

}

private String findNewLeader(String a_oldLeader, String a_topic,

int a_partition, int a_port) throws Exception {

for (int i = 0; i i++) {

boolean goToSleep = false;

PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,

a_topic, a_partition);

if (metadata == null) {

goToSleep = true;

} else if (metadata.leader() == null) {

goToSleep = true;

} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())

i == 0) {

// first time through if the leader hasn t changed give

// ZooKeeper a second to recover

// second time, assume the broker did recover before failover,

// or it was a non-Broker issue

//

goToSleep = true;

} else {

return metadata.leader().host();

}

if (goToSleep) {

try {

Thread.sleep(1000);

} catch (InterruptedException ie) {

}

}

}

System.out

.println(Unable to find new leader after Broker failure. Exiting

throw new Exception(

Unable to find new leader after Broker failure. Exiting

}

private PartitionMetadata findLeader(List String a_seedBrokers,

int a_port, String a_topic, int a_partition) {

PartitionMetadata returnMetaData = null;

loop: for (String seed : a_seedBrokers) {

SimpleConsumer consumer = null;

try {

consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,

leaderLookup

List String topics = Collections.singletonList(a_topic);

TopicMetadataRequest req = new TopicMetadataRequest(topics);

kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

List TopicMetadata metaData = resp.topicsMetadata();

for (TopicMetadata item : metaData) {

for (PartitionMetadata part : item.partitionsMetadata()) {

if (part.partitionId() == a_partition) {

returnMetaData = part;

break loop;

}

}

}

} catch (Exception e) {

System.out.println(Error communicating with Broker [ + seed

+ ] to find Leader for [+ a_topic + ,

+ a_partition + ] Reason: + e);

} finally {

if (consumer != null)

consumer.close();

}

}

if (returnMetaData != null) {

m_replicaBrokers.clear();

for (BrokerEndPoint replica : returnMetaData.replicas()) {

m_replicaBrokers.add(replica.host());

}

}

return returnMetaData;

}

}

四、消費結果如下:

 

 

到此,關于“kafka 的 low-level consumer 怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計6776字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 苍南县| 关岭| 呼和浩特市| 湟源县| 高碑店市| 青神县| 雷山县| 剑河县| 高密市| 深州市| 客服| 阳春市| 霍州市| 遂平县| 交城县| 浦江县| 荣昌县| 宝清县| 襄樊市| 合阳县| 吴江市| 巍山| 大庆市| 石渠县| 中宁县| 澳门| 宁都县| 阿勒泰市| 大足县| 无为县| 绩溪县| 永修县| 长治市| 武川县| 西昌市| 永泰县| 龙南县| 铜川市| 西城区| 常宁市| 霞浦县|