久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Kafka Consumer使用要注意什么

161次閱讀
沒有評論

共計 2749 個字符,預計需要花費 7 分鐘才能閱讀完成。

本篇內容主要講解“Kafka Consumer 使用要注意什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“Kafka Consumer 使用要注意什么”吧!

一、特點:

不用關心 offset, 會自動的讀 zookeeper 中該 Consumer group 的 last offset

二、注意事項

1. 如果 consumer 比 partition 多,是浪費,因為 kafka 的設計是在一個 partition 上是不允許并發的,

    所以 consumer 數不要大于 partition 數  

2. 如果 consumer 比 partition 少,一個 consumer 會對應于多個 partitions,

    這里主要合理分配 consumer 數和 partition 數,否則會導致 partition 里面的數據被取的不均勻  

    最好 partiton 數目是 consumer 數目的整數倍,所以 partition 數目很重要,

    比如取 24,就很容易設定 consumer 數目  

3. 如果 consumer 從多個 partition 讀到數據,不保證數據間的順序性,

    kafka 只保證在一個 partition 上數據是有序的,但多個 partition,根據你讀的順序會有不同  

4. 增減 consumer,broker,partition 會導致 rebalance,

      所以 rebalance 后 consumer 對應的 partition 會發生變化  

5. High-level 接口中獲取不到數據的時候是會 block 的  

三、代碼如下:

package kafkatest.kakfademo;

import java.io.UnsupportedEncodingException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerDemo1 {

public static void main(String[] args) {

ConsumerDemo1 demo = new ConsumerDemo1();

demo.test();

}

@SuppressWarnings(rawtypes)

public void test() {

String topicName = test

int numThreads = 1;

Properties properties = new Properties();

properties.put(zookeeper.connect , hadoop0:2181 // 聲明 zk

properties.put(group.id , group–demo // 必須要使用別的組名稱,

// 如果生產者和消費者都在同一組,則不能訪問同一組內的 topic 數據

ConsumerConnector consumer = Consumer

.createJavaConsumerConnector(new ConsumerConfig(properties));

Map String, Integer topicCountMap = new HashMap String, Integer

topicCountMap.put(topicName, numThreads); // 一次從主題中獲取一個數據

Map String, List KafkaStream byte[], byte[] messageStreams = consumer

.createMessageStreams(topicCountMap);

// 獲取每次接收到的這個數據

List KafkaStream byte[], byte[] streams = messageStreams

.get(topicName);

// now launch all the threads

ExecutorService executor = Executors.newFixedThreadPool(numThreads);

// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.execute(new ConsumerMsgTask(stream, threadNumber));

threadNumber++;

}

}

class ConsumerMsgTask implements Runnable {

private KafkaStream m_stream;

private int m_threadNumber;

public ConsumerMsgTask(KafkaStream stream, int threadNumber) {

m_threadNumber = threadNumber;

m_stream = stream;

}

public void run() {

ConsumerIterator byte[], byte[] it = m_stream.iterator();

long offset = 0;

try {

while (it.hasNext())

offset = it.next().offset();

byte[] bytes = it.next().message();

String msg = new String(bytes, UTF-8

System.out.print(offset: + offset + ,msg: + msg);

System.out.println(Shutting down Thread: + m_threadNumber);

} catch (UnsupportedEncodingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

四、實驗驗證

 

 

 

到此,相信大家對“Kafka Consumer 使用要注意什么”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計2749字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 沅陵县| 平和县| 临海市| 湖州市| 鸡泽县| 景谷| 梁平县| 读书| 巴楚县| 区。| 白银市| 峨山| 保山市| 肇州县| 安康市| 四会市| 巴中市| 东台市| 湘乡市| 墨江| 乌鲁木齐市| 凤翔县| 梧州市| 广汉市| 彩票| 西盟| 彭水| 沧州市| 类乌齐县| 萨迦县| 福安市| 民丰县| 项城市| 武功县| 芦山县| 莱州市| 通榆县| 澄江县| 西峡县| 涪陵区| 新竹县|