久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Apache Kafka框架是怎樣的呢

174次閱讀
沒有評論

共計 3913 個字符,預計需要花費 10 分鐘才能閱讀完成。

Apache Kafka 框架是怎樣的呢,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

Apache Kafka 框架

下面將對 Kafka 做一個簡單的描述。

關于 Kafka

 Kafka 是 Apache 下的一個用于處理數據流的分布式消息框架,它擁有水平擴展、容錯、高效等特性,可以使用該框架來實現以下功能:

構建在系統間進行實時數據傳輸的通道。

構建對數據流行轉換或響應的實時應用。

 Kafka 的整體結構與 RabbitMQ 類似,消息生產者向 Kafka 服務器發送消息,Kafak 接收消息后,再投遞給消費者。在 Kafka 中,生產者消息會被發送到 Topic 中,Topic 保存著各類的數據,每一條數據都使用鍵、值進行保存。每一個 Topic 下都包含一個或多個物理分區(Partition),這些分區維護著消息的內容和索引,它們有可能被保存在不同的服務器上面。對于客戶端來說,無須關心數據如何被保存,只需要關心將消息發往哪個 Topic。

運行 Kafka 服務器

 Kafka 依賴了 ZooKeeper,啟動 Kafka 服務器前,要先啟動 ZooKeeper。本章所使用的 ZooKeeper 版本為 3.4.8,Kafka 版本為 2.11。下載兩個框架的壓縮包后解壓,分別得到 zookeeper-3.4.8 與 kafka_2.11-0.11.0.0 目錄。

  先進入 zookeeper-3.4.8/conf 目錄,將 zoo_sample.cfg 文件復制一份,并重命名為 zoo.cfg。使用命令行工具,進行 zookeeper-3.4.8/bin 目錄,運行“zkServer”命令,如果正常啟動,將會占用 2181 端口,命令行窗口不必關閉,接下來啟動 Kafka。

  使用命令行工具,進行“kafka_2.11-0.11.0.0/bin/windows”目錄,運行“kafka-server-start ../../config/server.properties”命令啟動 Kafka 服務器,如果正常啟動,將會占用 9092 端口。此處的 Kafka 就相當于前面章節中的 RabbitMQ 服務器,Kafka 同樣提供了 API 讓我們編寫客戶端。接下來,我們按照同樣的方式,使用 Kafka 的 API 來進行測試。

編寫生產者

  新建一個名稱為“kafka-test”的 Maven 項目,加入以下依賴:

  dependency 
  groupId org.apache.kafka /groupId 
  artifactId kafka-clients /artifactId 
  version 0.11.0.0 /version 
  /dependency 
  dependency 
  groupId org.slf4j /groupId 
  artifactId slf4j-log4j12 /artifactId 
  version 1.7.9 /version 
  /dependency

  新建生產者的運行類,請見代碼清單 8 -3。

  代碼清單 8 -3:codes\08\8.3\kafka-test\src\main\java\org\crazyit\cloud\ProducerMain.java

public class ProducerMain { public static void main(String[] args) throws Exception {
 //  配置信息
 Properties props = new Properties();
 props.put( bootstrap.servers ,  localhost:9092 
 //  設置數據 key 的序列化處理類
 props.put( key.serializer ,
  org.apache.kafka.common.serialization.StringSerializer 
 //  設置數據 value 的序列化處理類
 props.put( value.serializer ,
  org.apache.kafka.common.serialization.StringSerializer 
 //  創建生產者實例
 Producer String, String  producer = new KafkaProducer (props); 
 //  創建一條新的記錄,第一個參數為 Topic 名稱
 ProducerRecord record = new ProducerRecord String, String ( my-topic ,  userName ,  Angus 
 //  發送記錄
 producer.send(record);
 producer.close();
 }
}

  生產者的代碼較 RabbitMQ 的簡單,創建屬性實例,直接使用配置實例創建 Producer(生產者),再創建一個 ProducerRecord(記錄),最后直接發送。在創建記錄時,指定了向“my-topic”投遞消息,消息的 key 為“userName”,value 為“Angus”。消息發送后,Kafka 會在服務器上創建一個相應的 Topic。運行代碼清單 8 -3,將消息投遞到 Kafka 服務器的 Topic 中,接下來可以使用命令查看服務器的 Topic。

  使用命令行工具進入 kafka_2.11-0.11.0.0/bin/windows 目錄,輸入命令“kafka-topics –list –zookeeper localhost:2181”,看到當前 Kafka 服務器的 Topic,如圖 8 - 8 所示。

圖 8 -8 查看 Topic

  如果想刪除服務器上面的 Topic,可使用“kafka-topics –delete –zookeeper localhost:2181 –topic my-topic”命令,但在默認情況下,執行該命令只是將 Topic 標記為刪除,如果想真正刪除 Topic,需要修改 config/server.properties 文件,加入“delete.topic.enable=true”配置。

編寫消費者

  本例中生產者與消費同在一個項目,只是使用不同的啟動類。前面小節在編寫生產者時,指定消息發送到“my-topic”,消費者訂閱該 Topic,就可以獲取到消息,詳細請見代碼清單 8 -4。

  代碼清單 8 -4:codes\08\8.3\kafka-test\src\main\java\org\crazyit\cloud\ConsumerMain.java

public class ConsumerMain { public static void main(String[] args) {
 //  配置信息
 Properties props = new Properties();
 props.put( bootstrap.servers ,  localhost:9092 
 //  必須指定消費者組
 props.put( group.id ,  test 
 props.put( key.deserializer ,
  org.apache.kafka.common.serialization.StringDeserializer 
 props.put( value.deserializer ,
  org.apache.kafka.common.serialization.StringDeserializer 
 KafkaConsumer String, String  consumer = new KafkaConsumer (props);
 //  訂閱  my-topic  的消息
 consumer.subscribe(Arrays.asList( my-topic));
 //  到服務器中讀取記錄
 while (true) { ConsumerRecords String, String  records = consumer.poll(100);
 for (ConsumerRecord String, String  record : records) { System.out.println( key:   + record.key() +  , value:   + record.value());
 }
 }
 }
}

  設置了配置的信息后,創建一個 KafkaConsumer 實例,通過該實例訂閱“my-topic”的消息,最后使用 KafkaConsumer 的 poll 方法獲取服務器消息并輸出。運地代碼清單 8 -4,再運行代碼清單 8 -5,可以看到輸出如下:

key: userName, value: Angus

消費者組

  在編寫消費者時,需要指定消費者組的 id,關于消費者組,由于 Spring Cloud Stream 中也涉及這個概念,因此需要特別說明一下。

  消費者會為自己添加一個消費者組的標識,每一條發布到 Topic 的記錄,都會被交付給消費者組的一個消費者實例。如果多個消費者實例擁有相同的消費者組,那么這些記錄將會分配到各個消費者實例上,以達到負載均衡的目的。如果所有的消費者都有不同的消費者組,那么每一條記錄都會被廣播到全部的消費者進行處理。如果理解不了這段文字,請見圖 8 -9。

圖 8 -9 消費者組

  如圖 8 -9,如果消費者 A 與消費者 B 屬于同一個“消費者組”,那么當生產者發送一條消息過來時,僅會交給其中一個消費者處理;如果兩個消費者不屬于同一個消費者組,那么該消息都會發給他們(廣播)進行處理。

看完上述內容,你們掌握 Apache Kafka 框架是怎樣的呢的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注丸趣 TV 行業資訊頻道,感謝各位的閱讀!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計3913字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 同德县| 增城市| 曲阳县| 塘沽区| 寿光市| 海丰县| 红安县| 抚宁县| 岳普湖县| 涿州市| 共和县| 成武县| 武强县| 嵩明县| 温州市| 长宁县| 锦州市| 镇远县| 兴安盟| 璧山县| 浑源县| 昭平县| 前郭尔| 塔河县| 定安县| 璧山县| 宜州市| 原阳县| 烟台市| 大丰市| 嘉义市| 手机| 临城县| 忻州市| 南投市| 景东| 色达县| 凤山县| 咸宁市| 鄂托克前旗| 馆陶县|