共計(jì) 4551 個(gè)字符,預(yù)計(jì)需要花費(fèi) 12 分鐘才能閱讀完成。
本篇文章給大家分享的是有關(guān)如何實(shí)現(xiàn) Kafka 的入門,丸趣 TV 小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著丸趣 TV 小編一起來看看吧。
一、入門 1. 簡(jiǎn)介
Kafka is a distributed, partitioned, replicated commit log service。它提供了類似于 JMS 的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是 JMS 規(guī)范的實(shí)現(xiàn)。kafka 對(duì)消息保存時(shí)根據(jù) Topic 進(jìn)行歸類,發(fā)送消息者成為 Producer, 消息接受者成為 Consumer, 此外 kafka 集群有多個(gè) kafka 實(shí)例組成,每個(gè)實(shí)例 (server) 成為 broker。無論是 kafka 集群,還是 producer 和 consumer 都依賴于 zookeeper 來保證系統(tǒng)可用性集群保存一些 meta 信息。
下面這張圖描述更準(zhǔn)確。
主要特性:
1)消息持久化
要從大數(shù)據(jù)中獲取真正的價(jià)值,那么不能丟失任何信息。Apache Kafka 設(shè)計(jì)上是時(shí)間復(fù)雜度 O(1) 的磁盤結(jié)構(gòu),它提供了常量時(shí)間的性能,即使是存儲(chǔ)海量的信息(TB 級(jí))。
2)高吞吐
記住大數(shù)據(jù),Kafka 的設(shè)計(jì)是工作在標(biāo)準(zhǔn)硬件之上,支持每秒數(shù)百萬的消息。
3)分布式
Kafka 明確支持在 Kafka 服務(wù)器上的消息分區(qū),以及在消費(fèi)機(jī)器集群上的分發(fā)消費(fèi),維護(hù)每個(gè)分區(qū)的排序語義。
4)多客戶端支持
Kafka 系統(tǒng)支持與來自不同平臺(tái)(如 java、.NET、PHP、Ruby 或 Python 等)的客戶端相集成。
5)實(shí)時(shí)
生產(chǎn)者線程產(chǎn)生的消息對(duì)消費(fèi)者線程應(yīng)該立即可見,此特性對(duì)基于事件的系統(tǒng)(比如 CEP 系統(tǒng))是至關(guān)重要的。
2. 概念 Topics/logs
一個(gè) Topic 可以認(rèn)為是一類消息,每個(gè) topic 將被分成多個(gè) partition(區(qū)), 每個(gè) partition 在存儲(chǔ)層面是 append log 文件。任何發(fā)布到此 partition 的消息都會(huì)被直接追加到 log 文件的尾部,每條消息在文件中的位置稱為 offset(偏移量),offset 為一個(gè) long 型數(shù)字,它是唯一標(biāo)記一條消息。它唯一的標(biāo)記一條消息。kafka 并沒有提供其他額外的索引機(jī)制來存儲(chǔ) offset,因?yàn)樵?kafka 中幾乎不允許對(duì)消息進(jìn)行“隨機(jī)讀寫”。
kafka 和 JMS 實(shí)現(xiàn) (activeMQ) 不同的是: 即使消息被消費(fèi), 消息仍然不會(huì)被立即刪除. 日志文件將會(huì)根據(jù) broker 中的配置要求, 保留一定的時(shí)間之后刪除; 比如 log 文件保留 2 天, 那么兩天后, 文件會(huì)被清除, 無論其中的消息是否被消費(fèi).kafka 通過這種簡(jiǎn)單的手段, 來釋放磁盤空間, 以及減少消息消費(fèi)之后對(duì)文件內(nèi)容改動(dòng)的磁盤 IO 開支.
對(duì)于 consumer 而言, 它需要保存消費(fèi)消息的 offset, 對(duì)于 offset 的保存和使用, 有 consumer 來控制; 當(dāng) consumer 正常消費(fèi)消息時(shí),offset 將會(huì) 線性 的向前驅(qū)動(dòng), 即消息將依次順序被消費(fèi). 事實(shí)上 consumer 可以使用任意順序消費(fèi)消息, 它只需要將 offset 重置為任意值..(offset 將會(huì)保存在 zookeeper 中, 參見下文)
kafka 集群幾乎不需要維護(hù)任何 consumer 和 producer 狀態(tài)信息, 這些信息有 zookeeper 保存; 因此 producer 和 consumer 的客戶端實(shí)現(xiàn)非常輕量級(jí), 它們可以隨意離開, 而不會(huì)對(duì)集群造成額外的影響.
partitions 的設(shè)計(jì)目的有多個(gè). 最根本原因是 kafka 基于文件存儲(chǔ). 通過分區(qū), 可以將日志內(nèi)容分散到多個(gè) server 上, 來避免文件尺寸達(dá)到單機(jī)磁盤的上限, 每個(gè) partiton 都會(huì)被當(dāng)前 server(kafka 實(shí)例)保存; 可以將一個(gè) topic 切分多任意多個(gè) partitions, 來消息保存 / 消費(fèi)的效率. 此外越多的 partitions 意味著可以容納更多的 consumer, 有效提升并發(fā)消費(fèi)的能力.(具體原理參見下文).
Distribution
一個(gè) Topic 的多個(gè) partitions, 被分布在 kafka 集群中的多個(gè) server 上; 每個(gè) server(kafka 實(shí)例)負(fù)責(zé) partitions 中消息的讀寫操作; 此外 kafka 還可以配置 partitions 需要備份的個(gè)數(shù)(replicas), 每個(gè) partition 將會(huì)被備份到多臺(tái)機(jī)器上, 以提高可用性.
基于 replicated 方案, 那么就意味著需要對(duì)多個(gè)備份進(jìn)行調(diào)度; 每個(gè) partition 都有一個(gè) server 為 leader leader 負(fù)責(zé)所有的讀寫操作, 如果 leader 失效, 那么將會(huì)有其他 follower 來接管(成為新的 leader);follower 只是單調(diào)的和 leader 跟進(jìn), 同步消息即可.. 由此可見作為 leader 的 server 承載了全部的請(qǐng)求壓力, 因此從集群的整體考慮, 有多少個(gè) partitions 就意味著有多少個(gè) leader ,kafka 會(huì)將 leader 均衡的分散在每個(gè)實(shí)例上, 來確保整體的性能穩(wěn)定.
Producers
Producer 將消息發(fā)布到指定的 Topic 中, 同時(shí) Producer 也能決定將此消息歸屬于哪個(gè) partition; 比如基于 round-robin 方式或者通過其他的一些算法等.
Consumers
本質(zhì)上 kafka 只支持 Topic. 每個(gè) consumer 屬于一個(gè) consumer group; 反過來說, 每個(gè) group 中可以有多個(gè) consumer. 發(fā)送到 Topic 的消息, 只會(huì)被訂閱此 Topic 的每個(gè) group 中的一個(gè) consumer 消費(fèi).
如果所有的 consumer 都具有相同的 group, 這種情況和 queue 模式很像; 消息將會(huì)在 consumers 之間負(fù)載均衡.
如果所有的 consumer 都具有不同的 group, 那這就是 發(fā)布 - 訂閱 消息將會(huì)廣播給所有的消費(fèi)者.
在 kafka 中, 一個(gè) partition 中的消息只會(huì)被 group 中的一個(gè) consumer 消費(fèi); 每個(gè) group 中 consumer 消息消費(fèi)互相獨(dú)立; 我們可以認(rèn)為一個(gè) group 是一個(gè) 訂閱 者, 一個(gè) Topic 中的每個(gè) partions, 只會(huì)被一個(gè) 訂閱者 中的一個(gè) consumer 消費(fèi), 不過一個(gè) consumer 可以消費(fèi)多個(gè) partitions 中的消息.kafka 只能保證一個(gè) partition 中的消息被某個(gè) consumer 消費(fèi)時(shí), 消息是順序的. 事實(shí)上, 從 Topic 角度來說, 消息仍不是有序的.
kafka 的設(shè)計(jì)原理決定, 對(duì)于一個(gè) topic, 同一個(gè) group 中不能有多于 partitions 個(gè)數(shù)的 consumer 同時(shí)消費(fèi), 否則將意味著某些 consumer 將無法得到消息.
Guarantees
1) 發(fā)送到 partitions 中的消息將會(huì)按照它接收的順序追加到日志中
2) 對(duì)于消費(fèi)者而言, 它們消費(fèi)消息的順序和日志中消息順序一致.
3) 如果 Topic 的 replication factor 為 N, 那么允許 N - 1 個(gè) kafka 實(shí)例失效.
3. 適用場(chǎng)景 1、Messaging
對(duì)于一些常規(guī)的消息系統(tǒng),kafka 是個(gè)不錯(cuò)的選擇;partitons/replication 和容錯(cuò), 可以使 kafka 具有良好的擴(kuò)展性和性能優(yōu)勢(shì). 不過到目前為止, 我們應(yīng)該很清楚認(rèn)識(shí)到,kafka 并沒有提供 JMS 中的 事務(wù)性 消息傳輸擔(dān)保(消息確認(rèn)機(jī)制) 消息分組 等企業(yè)級(jí)特性;kafka 只能使用作為 常規(guī) 的消息系統(tǒng), 在一定程度上, 尚未確保消息的發(fā)送與接收絕對(duì)可靠(比如, 消息重發(fā), 消息發(fā)送丟失等)
2、Websit activity tracking
kafka 可以作為 網(wǎng)站活性跟蹤 的最佳工具; 可以將網(wǎng)頁 / 用戶操作等信息發(fā)送到 kafka 中. 并實(shí)時(shí)監(jiān)控, 或者離線統(tǒng)計(jì)分析等
3、Metrics
Kafka 通常被用于可操作的監(jiān)控?cái)?shù)據(jù)。這包括從分布式應(yīng)用程序來的聚合統(tǒng)計(jì)用來生產(chǎn)集中的運(yùn)營數(shù)據(jù)提要。
4、Log Aggregation
kafka 的特性決定它非常適合作為 日志收集中心 application 可以將操作日志 批量 異步 的發(fā)送到 kafka 集群中, 而不是保存在本地或者 DB 中;kafka 可以批量提交消息 / 壓縮消息等, 這對(duì) producer 端而言, 幾乎感覺不到性能的開支. 此時(shí) consumer 端可以使 hadoop 等其他系統(tǒng)化的存儲(chǔ)和分析系統(tǒng).
4. 命令
1. 啟動(dòng) Server
Kafka 依賴 ZK 服務(wù)
nohup bin/kafka-server-start.sh config/server.properties
2. 創(chuàng)建 Topic
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic page_visits
3. 查看命令
bin/kafka-topics.sh –list –zookeeper localhost:2181
4. 發(fā)送消息
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic page_visits
5. 消費(fèi)消息
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic page_visits –from-beginning
6. 多 Broker 方式
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic visits
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic visits
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic visits
my message test1
my message test2
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic visits
7. 停止服務(wù)
pkill -9 -f config/server.properties
8. 刪除無用的 topic
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand –topic visits –zookeeper sjxt-hd02:2181,sjxt-hd03:2181,sjxt-hd04:2181
beta in 0.8.1
bin/kafka-topics.sh --zookeeper zk_host:port --delete --topic my_topic_name
以上就是如何實(shí)現(xiàn) Kafka 的入門,丸趣 TV 小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注丸趣 TV 行業(yè)資訊頻道。