共計 3913 個字符,預計需要花費 10 分鐘才能閱讀完成。
Apache Kafka 框架是怎樣的呢,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
Apache Kafka 框架
下面將對 Kafka 做一個簡單的描述。
關于 Kafka
Kafka 是 Apache 下的一個用于處理數據流的分布式消息框架,它擁有水平擴展、容錯、高效等特性,可以使用該框架來實現以下功能:
構建在系統間進行實時數據傳輸的通道。
構建對數據流行轉換或響應的實時應用。
Kafka 的整體結構與 RabbitMQ 類似,消息生產者向 Kafka 服務器發送消息,Kafak 接收消息后,再投遞給消費者。在 Kafka 中,生產者消息會被發送到 Topic 中,Topic 保存著各類的數據,每一條數據都使用鍵、值進行保存。每一個 Topic 下都包含一個或多個物理分區(Partition),這些分區維護著消息的內容和索引,它們有可能被保存在不同的服務器上面。對于客戶端來說,無須關心數據如何被保存,只需要關心將消息發往哪個 Topic。
運行 Kafka 服務器
Kafka 依賴了 ZooKeeper,啟動 Kafka 服務器前,要先啟動 ZooKeeper。本章所使用的 ZooKeeper 版本為 3.4.8,Kafka 版本為 2.11。下載兩個框架的壓縮包后解壓,分別得到 zookeeper-3.4.8 與 kafka_2.11-0.11.0.0 目錄。
先進入 zookeeper-3.4.8/conf 目錄,將 zoo_sample.cfg 文件復制一份,并重命名為 zoo.cfg。使用命令行工具,進行 zookeeper-3.4.8/bin 目錄,運行“zkServer”命令,如果正常啟動,將會占用 2181 端口,命令行窗口不必關閉,接下來啟動 Kafka。
使用命令行工具,進行“kafka_2.11-0.11.0.0/bin/windows”目錄,運行“kafka-server-start ../../config/server.properties”命令啟動 Kafka 服務器,如果正常啟動,將會占用 9092 端口。此處的 Kafka 就相當于前面章節中的 RabbitMQ 服務器,Kafka 同樣提供了 API 讓我們編寫客戶端。接下來,我們按照同樣的方式,使用 Kafka 的 API 來進行測試。
編寫生產者
新建一個名稱為“kafka-test”的 Maven 項目,加入以下依賴:
dependency
groupId org.apache.kafka /groupId
artifactId kafka-clients /artifactId
version 0.11.0.0 /version
/dependency
dependency
groupId org.slf4j /groupId
artifactId slf4j-log4j12 /artifactId
version 1.7.9 /version
/dependency
新建生產者的運行類,請見代碼清單 8 -3。
代碼清單 8 -3:codes\08\8.3\kafka-test\src\main\java\org\crazyit\cloud\ProducerMain.java
public class ProducerMain { public static void main(String[] args) throws Exception {
// 配置信息
Properties props = new Properties();
props.put( bootstrap.servers , localhost:9092
// 設置數據 key 的序列化處理類
props.put( key.serializer ,
org.apache.kafka.common.serialization.StringSerializer
// 設置數據 value 的序列化處理類
props.put( value.serializer ,
org.apache.kafka.common.serialization.StringSerializer
// 創建生產者實例
Producer String, String producer = new KafkaProducer (props);
// 創建一條新的記錄,第一個參數為 Topic 名稱
ProducerRecord record = new ProducerRecord String, String ( my-topic , userName , Angus
// 發送記錄
producer.send(record);
producer.close();
}
}
生產者的代碼較 RabbitMQ 的簡單,創建屬性實例,直接使用配置實例創建 Producer(生產者),再創建一個 ProducerRecord(記錄),最后直接發送。在創建記錄時,指定了向“my-topic”投遞消息,消息的 key 為“userName”,value 為“Angus”。消息發送后,Kafka 會在服務器上創建一個相應的 Topic。運行代碼清單 8 -3,將消息投遞到 Kafka 服務器的 Topic 中,接下來可以使用命令查看服務器的 Topic。
使用命令行工具進入 kafka_2.11-0.11.0.0/bin/windows 目錄,輸入命令“kafka-topics –list –zookeeper localhost:2181”,看到當前 Kafka 服務器的 Topic,如圖 8 - 8 所示。
圖 8 -8 查看 Topic
如果想刪除服務器上面的 Topic,可使用“kafka-topics –delete –zookeeper localhost:2181 –topic my-topic”命令,但在默認情況下,執行該命令只是將 Topic 標記為刪除,如果想真正刪除 Topic,需要修改 config/server.properties 文件,加入“delete.topic.enable=true”配置。
編寫消費者
本例中生產者與消費同在一個項目,只是使用不同的啟動類。前面小節在編寫生產者時,指定消息發送到“my-topic”,消費者訂閱該 Topic,就可以獲取到消息,詳細請見代碼清單 8 -4。
代碼清單 8 -4:codes\08\8.3\kafka-test\src\main\java\org\crazyit\cloud\ConsumerMain.java
public class ConsumerMain { public static void main(String[] args) {
// 配置信息
Properties props = new Properties();
props.put( bootstrap.servers , localhost:9092
// 必須指定消費者組
props.put( group.id , test
props.put( key.deserializer ,
org.apache.kafka.common.serialization.StringDeserializer
props.put( value.deserializer ,
org.apache.kafka.common.serialization.StringDeserializer
KafkaConsumer String, String consumer = new KafkaConsumer (props);
// 訂閱 my-topic 的消息
consumer.subscribe(Arrays.asList( my-topic));
// 到服務器中讀取記錄
while (true) { ConsumerRecords String, String records = consumer.poll(100);
for (ConsumerRecord String, String record : records) { System.out.println( key: + record.key() + , value: + record.value());
}
}
}
}
設置了配置的信息后,創建一個 KafkaConsumer 實例,通過該實例訂閱“my-topic”的消息,最后使用 KafkaConsumer 的 poll 方法獲取服務器消息并輸出。運地代碼清單 8 -4,再運行代碼清單 8 -5,可以看到輸出如下:
key: userName, value: Angus
消費者組
在編寫消費者時,需要指定消費者組的 id,關于消費者組,由于 Spring Cloud Stream 中也涉及這個概念,因此需要特別說明一下。
消費者會為自己添加一個消費者組的標識,每一條發布到 Topic 的記錄,都會被交付給消費者組的一個消費者實例。如果多個消費者實例擁有相同的消費者組,那么這些記錄將會分配到各個消費者實例上,以達到負載均衡的目的。如果所有的消費者都有不同的消費者組,那么每一條記錄都會被廣播到全部的消費者進行處理。如果理解不了這段文字,請見圖 8 -9。
圖 8 -9 消費者組
如圖 8 -9,如果消費者 A 與消費者 B 屬于同一個“消費者組”,那么當生產者發送一條消息過來時,僅會交給其中一個消費者處理;如果兩個消費者不屬于同一個消費者組,那么該消息都會發給他們(廣播)進行處理。
看完上述內容,你們掌握 Apache Kafka 框架是怎樣的呢的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注丸趣 TV 行業資訊頻道,感謝各位的閱讀!