共計 2749 個字符,預計需要花費 7 分鐘才能閱讀完成。
本篇內容主要講解“Kafka Consumer 使用要注意什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“Kafka Consumer 使用要注意什么”吧!
一、特點:
不用關心 offset, 會自動的讀 zookeeper 中該 Consumer group 的 last offset
二、注意事項
1. 如果 consumer 比 partition 多,是浪費,因為 kafka 的設計是在一個 partition 上是不允許并發的,
所以 consumer 數不要大于 partition 數
2. 如果 consumer 比 partition 少,一個 consumer 會對應于多個 partitions,
這里主要合理分配 consumer 數和 partition 數,否則會導致 partition 里面的數據被取的不均勻
最好 partiton 數目是 consumer 數目的整數倍,所以 partition 數目很重要,
比如取 24,就很容易設定 consumer 數目
3. 如果 consumer 從多個 partition 讀到數據,不保證數據間的順序性,
kafka 只保證在一個 partition 上數據是有序的,但多個 partition,根據你讀的順序會有不同
4. 增減 consumer,broker,partition 會導致 rebalance,
所以 rebalance 后 consumer 對應的 partition 會發生變化
5. High-level 接口中獲取不到數據的時候是會 block 的
三、代碼如下:
package kafkatest.kakfademo;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerDemo1 {
public static void main(String[] args) {
ConsumerDemo1 demo = new ConsumerDemo1();
demo.test();
}
@SuppressWarnings(rawtypes)
public void test() {
String topicName = test
int numThreads = 1;
Properties properties = new Properties();
properties.put(zookeeper.connect , hadoop0:2181 // 聲明 zk
properties.put(group.id , group–demo // 必須要使用別的組名稱,
// 如果生產者和消費者都在同一組,則不能訪問同一組內的 topic 數據
ConsumerConnector consumer = Consumer
.createJavaConsumerConnector(new ConsumerConfig(properties));
Map String, Integer topicCountMap = new HashMap String, Integer
topicCountMap.put(topicName, numThreads); // 一次從主題中獲取一個數據
Map String, List KafkaStream byte[], byte[] messageStreams = consumer
.createMessageStreams(topicCountMap);
// 獲取每次接收到的這個數據
List KafkaStream byte[], byte[] streams = messageStreams
.get(topicName);
// now launch all the threads
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.execute(new ConsumerMsgTask(stream, threadNumber));
threadNumber++;
}
}
class ConsumerMsgTask implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
m_threadNumber = threadNumber;
m_stream = stream;
}
public void run() {
ConsumerIterator byte[], byte[] it = m_stream.iterator();
long offset = 0;
try {
while (it.hasNext())
offset = it.next().offset();
byte[] bytes = it.next().message();
String msg = new String(bytes, UTF-8
System.out.print(offset: + offset + ,msg: + msg);
System.out.println(Shutting down Thread: + m_threadNumber);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
四、實驗驗證
到此,相信大家對“Kafka Consumer 使用要注意什么”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!