久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

spring kakfa如何集成

171次閱讀
沒有評論

共計 3789 個字符,預計需要花費 10 分鐘才能閱讀完成。

丸趣 TV 小編給大家分享一下 spring kakfa 如何集成,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

一、生產端

1.1 kafka-producer.xml 配置說明

!-- spring 的屬性加載器,加載多個 properties 文件中的屬性  ,  如果只有一個 properties 文件則用 context / 就行了,用了這個加載器過后不用在其他 xml 中再使用了 -- 
 bean id= propertyConfigurer 
  >
 !--  定義 producer 的參數  -- 
 bean id= producerProperties   >

1.2 kafka-producer.properties 屬性文件

bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
defaultTopic=topic-test

1.3 生產端接口封裝說明:

1)類名:
com.rkhd.ienterprise.kafka.producer.KafkaProducerServer

2)方法:

/**
 *  發送信息(不分區) * @param data  要發送的數據
 * @return  返回一個 map。如果成功 code 為 0, 其他則為失敗
 */
public Map String, Object  sendDefault(Object data);
/**
 *  發送信息 (不分區)
 * @param key  要發送的鍵
 * @param data  要發送的數據
 * @return  返回一個 map。如果成功 code 為 0, 其他則為失敗
 */
public Map String, Object  sendDefault(Object key, Object data);
/**
 *  發送信息 (分區)
 * @param partitionNum  分區數 (大于 1), 請注意分區數是在 topic 創建的時候就指定了,不能改變了
 * @param key  要發送的鍵
 * @param data  要發送的數據
 * @return  返回一個 map。如果成功 code 為 0, 其他則為失敗
 */
public Map String, Object  sendDefault(int partitionNum, Object key, Object data);
/**
 *  發送信息 (不分區)
 * @param topic  發送目的 topic 名稱, 如果 topic 為 null 或者是為 , 則會使用 xml 中配置的 defaultTopic
 * @param data  要發送的數據
 * @return  返回一個 map。如果成功 code 為 0, 其他則為失敗
 */
public Map String, Object  sendMessage(String topic, Object data);
/**
 *  發送信息 (不分區)
 * @param topic  發送目的 topic 名稱, 如果 topic 為 null 或者是為 , 則會使用 xml 中配置的 defaultTopic
 * @param key  要發送的鍵
 * @param data  要發送的數據
 * @return  返回一個 map。如果成功 code 為 0, 其他則為失敗
 * */
public Map String, Object  sendMessage(String topic, Object key, Object data);
/**
 *  發送信息 (分區)
 * @param topic  發送目的 topic 名稱, 如果 topic 為 null 或者是為 , 則會使用 xml 中配置的 defaultTopic
 * @param partitionNum  分區數 (大于 1), 請注意分區數是在 topic 創建的時候就指定了,不能改變了
 * @param data  要發送的數據
 * @return  返回一個 map。如果成功 code 為 0, 其他則為失敗
 */
public Map String, Object  sendMessage(String topic, Integer partitionNum, Object data);
/**
 *  發送信息 (分區)
 * @param topic  發送目的 topic 名稱, 如果 topic 為 null 或者是為 , 則會使用 xml 中配置的 defaultTopic
 * @param key  要發送的鍵
 * @param value  要發送的數據
 * @param partitionNum  分區數 (大于 1), 請注意分區數是在 topic 創建的時候就指定了,不能改變了
 * @return  返回一個 map。如果成功 code 為 0, 其他則為失敗
 * */
public Map String, Object  sendMessage(String topic, int partitionNum, Object key, Object value);

二、消費端

2.1 kafka-consumer.xml 配置說明

!--  定義 consumer 的參數  -- 
 bean id= consumerProperties   >
 
property name= ackCount  value= 90 / -- 
  !-- property name= ackMode  value= TIME / 
  property name= ackTime  value= 5000 / -- 
 /bean 
 !--  創建單實例 KafkaMessageListenerContainer--
!-- bean id= messageListenerContainer_trade   >
 !--  創建多實例 ConcurrentMessageListenerContainer--
bean id= messageListenerContainer   >

2.2 kafka-consumer.properties 屬性文件

bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
enable.auto.commit=false
auto.commit.interval.ms=1000
session.timeout.ms=15000
topicName=ahao-test

2.3 消費端接口封裝說明

1)類名:com.rkhd.ienterprise.mq.client.consumer.client.KafkaConsumerClient

2)對外提供抽象方法(根據不同的業務實現):

public abstract void onConsumer(ConsumerRecord String, String  record);

3)實現說明:各業務線通過繼承該類實現該抽象方法;

三、Kafka 技術概覽

3.1 Kafka 的特性

高吞吐量、低延遲:kafka 每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒

可擴展性:kafka 集群支持熱擴展

持久性、可靠性:消息被持久化到本地磁盤,并且支持數據備份防止數據丟失

容錯性:允許集群中節點失敗(若副本數量為 n, 則允許 n - 1 個節點失敗)

高并發:支持數千個客戶端同時讀寫

3.2 Kafka 架構組件 

       Kafka 中發布訂閱的對象是 topic。我們可以為每類數據創建一個 topic,把向 topic 發布消息的客戶端稱作 producer,從 topic 訂閱消息的客戶端稱作 consumer。Producers 和                consumers 可以同時從多個 topic 讀寫數據。一個 kafka 集群由一個或多個 broker 服務器組成,它負責持久化和備份具體的 kafka 消息。

topic:消息存放的目錄即主題

Producer:生產消息到 topic 的一方

Consumer:訂閱 topic 消費消息的一方

Broker:Kafka 的服務實例就是一個 broker

3.3 kafka 應用場景

日志收集:一個公司可以用 Kafka 可以收集各種服務的 log,通過 kafka 以統一接口服務的方式開放給各種 consumer,例如 hadoop、Hbase、Solr 等。

消息系統:解耦和生產者和消費者、緩存消息等。

用戶活動跟蹤:Kafka 經常被用來記錄 web 用戶或者 app 用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到 kafka 的 topic 中,然后訂閱者通過訂閱這些 topic 來做實時的監控分析,或者裝載到 hadoop、數據倉庫中做離線分析和挖掘。

運營指標:Kafka 也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。

流式處理:比如 spark streaming 和 storm

事件源

以上是“spring kakfa 如何集成”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注丸趣 TV 行業資訊頻道!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計3789字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 邯郸市| 潢川县| 通榆县| 凉山| 辽阳市| 贵溪市| 兴海县| 凌海市| 吉水县| 酒泉市| 黎平县| 富蕴县| 庆城县| 金塔县| 江口县| 肥城市| 临泉县| 镇宁| 山东省| 响水县| 镇雄县| 平远县| 霍州市| 南木林县| 绥中县| 普安县| 台州市| 白银市| 普兰店市| 汽车| 乡宁县| 通海县| 林甸县| 广昌县| 济阳县| 姜堰市| 和硕县| 托克逊县| 都匀市| 龙山县| 商城县|