共計 3789 個字符,預計需要花費 10 分鐘才能閱讀完成。
丸趣 TV 小編給大家分享一下 spring kakfa 如何集成,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
一、生產端
1.1 kafka-producer.xml 配置說明
!-- spring 的屬性加載器,加載多個 properties 文件中的屬性 , 如果只有一個 properties 文件則用 context / 就行了,用了這個加載器過后不用在其他 xml 中再使用了 --
bean id= propertyConfigurer
> !-- 定義 producer 的參數 --
bean id= producerProperties >1.2 kafka-producer.properties 屬性文件
bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
defaultTopic=topic-test
1.3 生產端接口封裝說明:
1)類名:
com.rkhd.ienterprise.kafka.producer.KafkaProducerServer
2)方法:
/**
* 發送信息(不分區) * @param data 要發送的數據
* @return 返回一個 map。如果成功 code 為 0, 其他則為失敗
*/
public Map String, Object sendDefault(Object data);
/**
* 發送信息 (不分區)
* @param key 要發送的鍵
* @param data 要發送的數據
* @return 返回一個 map。如果成功 code 為 0, 其他則為失敗
*/
public Map String, Object sendDefault(Object key, Object data);
/**
* 發送信息 (分區)
* @param partitionNum 分區數 (大于 1), 請注意分區數是在 topic 創建的時候就指定了,不能改變了
* @param key 要發送的鍵
* @param data 要發送的數據
* @return 返回一個 map。如果成功 code 為 0, 其他則為失敗
*/
public Map String, Object sendDefault(int partitionNum, Object key, Object data);
/**
* 發送信息 (不分區)
* @param topic 發送目的 topic 名稱, 如果 topic 為 null 或者是為 , 則會使用 xml 中配置的 defaultTopic
* @param data 要發送的數據
* @return 返回一個 map。如果成功 code 為 0, 其他則為失敗
*/
public Map String, Object sendMessage(String topic, Object data);
/**
* 發送信息 (不分區)
* @param topic 發送目的 topic 名稱, 如果 topic 為 null 或者是為 , 則會使用 xml 中配置的 defaultTopic
* @param key 要發送的鍵
* @param data 要發送的數據
* @return 返回一個 map。如果成功 code 為 0, 其他則為失敗
* */
public Map String, Object sendMessage(String topic, Object key, Object data);
/**
* 發送信息 (分區)
* @param topic 發送目的 topic 名稱, 如果 topic 為 null 或者是為 , 則會使用 xml 中配置的 defaultTopic
* @param partitionNum 分區數 (大于 1), 請注意分區數是在 topic 創建的時候就指定了,不能改變了
* @param data 要發送的數據
* @return 返回一個 map。如果成功 code 為 0, 其他則為失敗
*/
public Map String, Object sendMessage(String topic, Integer partitionNum, Object data);
/**
* 發送信息 (分區)
* @param topic 發送目的 topic 名稱, 如果 topic 為 null 或者是為 , 則會使用 xml 中配置的 defaultTopic
* @param key 要發送的鍵
* @param value 要發送的數據
* @param partitionNum 分區數 (大于 1), 請注意分區數是在 topic 創建的時候就指定了,不能改變了
* @return 返回一個 map。如果成功 code 為 0, 其他則為失敗
* */
public Map String, Object sendMessage(String topic, int partitionNum, Object key, Object value);
二、消費端
2.1 kafka-consumer.xml 配置說明
!-- 定義 consumer 的參數 --
bean id= consumerProperties >
property name= ackCount value= 90 / --
!-- property name= ackMode value= TIME /
property name= ackTime value= 5000 / --
/bean
!-- 創建單實例 KafkaMessageListenerContainer--
!-- bean id= messageListenerContainer_trade > !-- 創建多實例 ConcurrentMessageListenerContainer--
bean id= messageListenerContainer >2.2 kafka-consumer.properties 屬性文件
bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
enable.auto.commit=false
auto.commit.interval.ms=1000
session.timeout.ms=15000
topicName=ahao-test
2.3 消費端接口封裝說明
1)類名:
com.rkhd.ienterprise.mq.client.consumer.client.KafkaConsumerClient
2)對外提供抽象方法(根據不同的業務實現):
public abstract void onConsumer(ConsumerRecord String, String record);
3)實現說明:各業務線通過繼承該類實現該抽象方法;
三、Kafka 技術概覽
3.1 Kafka 的特性
高吞吐量、低延遲:kafka 每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒
可擴展性:kafka 集群支持熱擴展
持久性、可靠性:消息被持久化到本地磁盤,并且支持數據備份防止數據丟失
容錯性:允許集群中節點失敗(若副本數量為 n, 則允許 n - 1 個節點失敗)
高并發:支持數千個客戶端同時讀寫
3.2 Kafka 架構組件
Kafka 中發布訂閱的對象是 topic。我們可以為每類數據創建一個 topic,把向 topic 發布消息的客戶端稱作 producer,從 topic 訂閱消息的客戶端稱作 consumer。Producers 和 consumers 可以同時從多個 topic 讀寫數據。一個 kafka 集群由一個或多個 broker 服務器組成,它負責持久化和備份具體的 kafka 消息。
topic:消息存放的目錄即主題
Producer:生產消息到 topic 的一方
Consumer:訂閱 topic 消費消息的一方
Broker:Kafka 的服務實例就是一個 broker
3.3 kafka 應用場景
日志收集:一個公司可以用 Kafka 可以收集各種服務的 log,通過 kafka 以統一接口服務的方式開放給各種 consumer,例如 hadoop、Hbase、Solr 等。
消息系統:解耦和生產者和消費者、緩存消息等。
用戶活動跟蹤:Kafka 經常被用來記錄 web 用戶或者 app 用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到 kafka 的 topic 中,然后訂閱者通過訂閱這些 topic 來做實時的監控分析,或者裝載到 hadoop、數據倉庫中做離線分析和挖掘。
運營指標:Kafka 也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
流式處理:比如 spark streaming 和 storm
事件源
以上是“spring kakfa 如何集成”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注丸趣 TV 行業資訊頻道!