久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何實現Kafka精確傳遞一次語義

174次閱讀
沒有評論

共計 3821 個字符,預計需要花費 10 分鐘才能閱讀完成。

如何實現 Kafka 精確傳遞一次語義,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

我們都知道 Kafka 的吞吐量很大,但是 Kafka 究竟會不會丟失消息呢?又會不會重復消費消息呢?

有很多公司因為業務要求必須保證消息不丟失、不重復的到達,比如無人機實時監控系統,當無人機闖入機場區域,我們必須立刻報警,不允許消息丟失。而無人機離開禁飛區域后我們需要將及時報警解除。如果消息重復了呢,我們是否需要復雜的邏輯來自己處理消息重復的情況呢,這種情況恐怕相當復雜而難以處理。但是如果我們能保證消息 exactly once,那么一切都容易得多。

下面我們來簡單了解一下消息傳遞語義,以及 kafka 的消息傳遞機制。

首先我們要了解的是 message delivery semantic   也就是消息傳遞語義。

這是一個通用的概念,也就是消息傳遞過程中消息傳遞的保證性。

分為三種:

最多一次(at most once): 消息可能丟失也可能被處理,但最多只會被處理一次。

可能丟失 不會重復

至少一次(at least once):   消息不會丟失,但可能被處理多次。

可能重復 不會丟失

精確傳遞一次(exactly once): 消息被處理且只會被處理一次。

不丟失 不重復 就一次

而 kafka 其實有兩次消息傳遞,一次生產者發送消息給 kafka,一次消費者去 kafka 消費消息。

兩次傳遞都會影響最終結果,

兩次都是精確一次,最終結果才是精確一次。

兩次中有一次會丟失消息,或者有一次會重復,那么最終的結果就是可能丟失或者重復的。

  一、Produce 端消息傳遞

這是 producer 端的代碼:

Properties properties = new Properties();
       properties.put(bootstrap.servers , kafka01:9092,kafka02:9092
       properties.put(acks , all
       properties.put(retries , 0);
       properties.put(batch.size , 16384);
       properties.put(linger.ms , 1);
       properties.put(buffer.memory , 33554432);
       properties.put(key.serializer , org.apache.kafka.common.serialization.StringSerializer
       properties.put(value.serializer , org.apache.kafka.common.serialization.StringSerializer
       KafkaProducer String, String kafkaProducer = new KafkaProducer String, String (properties);
       for (int i = 1; i = 600; i++) {
           kafkaProducer.send(new ProducerRecord String, String ( z_test_20190430 , testkafka0613 +i));
           System.out.println(testkafka +i);
       }
       kafkaProducer.close();

 

其中指定了一個參數 acks   可以有三個值選擇:

0:producer 完全不管 broker 的處理結果 回調也就沒有用了 并不能保證消息成功發送 但是這種吞吐量最高

all 或者 -1:leader broker 會等消息寫入 并且 ISR 都寫入后 才會響應,這種只要 ISR 有副本存活就肯定不會丟失,但吞吐量最低。

1:默認的值 leader broker 自己寫入后就響應,不會等待 ISR 其他的副本寫入,只要 leader broker 存活就不會丟失,即保證了不丟失,也保證了吞吐量。

所以設置為 0 時,實現了 at most once,而且從這邊看只要保證集群穩定的情況下,不設置為 0,消息不會丟失。

但是還有一種情況就是消息成功寫入,而這個時候由于網絡問題 producer 沒有收到寫入成功的響應,producer 就會開啟重試的操作,直到網絡恢復,消息就發送了多次。這就是 at least once 了。

kafka producer 的參數 acks 的默認值為 1,所以默認的 producer 級別是 at least once。并不能 exactly once。

二、Consumer 端消息傳遞

consumer 是靠 offset 保證消息傳遞的。

consumer 消費的代碼如下:

Properties props = new Properties();
       props.put(bootstrap.servers , kafka01:9092,kafka02:9092
       props.put(group.id , test
       props.put(enable.auto.commit , true
       props.put(auto.commit.interval.ms , 1000
       props.put(key.deserializer , org.apache.kafka.common.serialization.StringDeserializer
       props.put(value.deserializer , org.apache.kafka.common.serialization.StringDeserializer
       
       props.put(auto.offset.reset , earliest
       
       KafkaConsumer String, String consumer = new KafkaConsumer (props);
       consumer.subscribe(Arrays.asList( foo , bar));
     try{
       while (true) {
           ConsumerRecords String, String records = consumer.poll(1000);
           for (ConsumerRecord String, String record : records) {
               System.out.printf(offset = %d, key = %s, value = %s%n , record.offset(), record.key(), record.value());
           }
        }
       }finally{
         consumer.close();
       }

 

其中有一個參數是  enable.auto.commit

若設置為 true   consumer 在消費之前提交位移 就實現了 at most once

若是消費后提交 就實現了 at least once   默認的配置就是這個。

kafka consumer 的參數 enable.auto.commit 的默認值為 true  ,所以默認的 consumer 級別是 at least once。也并不能 exactly once。

  三、精確一次

通過了解 producer 端與 consumer 端的設置,我們發現 kafka 在兩端的默認配置都是 at least once,肯能重復,通過配置的話呢也不能做到 exactly once,好像 kafka 的消息一定會丟失或者重復的,是不是沒有辦法做到 exactly once 了呢?

確實在 kafka 0.11.0.0 版本之前 producer 端確實是不可能的,但是在 kafka 0.11.0.0 版本之后,kafka 正式推出了 idempotent  producer。

也就是冪等的 producer 還有對事務的支持。

  冪等的 producer

kafka 0.11.0.0 版本引入了 idempotent  producer 機制,在這個機制中同一消息可能被 producer 發送多次,但是在 broker 端只會寫入一次,他為每一條消息編號去重,而且對 kafka 開銷影響不大。

如何設置開啟呢?  需要設置 producer 端的新參數  enable.idempotent   為 true。

而多分區的情況,我們需要保證原子性的寫入多個分區,即寫入到多個分區的消息要么全部成功,要么全部回滾。

這時候就需要使用事務,在 producer 端設置 transcational.id 為一個指定字符串。

這樣冪等 producer 只能保證單分區上無重復消息;事務可以保證多分區寫入消息的完整性。

這樣 producer 端實現了 exactly once,那么 consumer 端呢?

consumer 端由于可能無法消費事務中所有消息,并且消息可能被刪除,所以事務并不能解決 consumer 端 exactly once 的問題,我們可能還是需要自己處理這方面的邏輯。比如自己管理 offset 的提交,不要自動提交,也是可以實現 exactly once 的。

還有一個選擇就是使用 kafka 自己的流處理引擎,也就是 Kafka Streams,

設置 processing.guarantee=exactly_once,就可以輕松實現 exactly once 了。

看完上述內容,你們掌握如何實現 Kafka 精確傳遞一次語義的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注丸趣 TV 行業資訊頻道,感謝各位的閱讀!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-17發表,共計3821字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 屯昌县| 睢宁县| 柘城县| 株洲市| 威宁| 都安| 康定县| 敦化市| 榆树市| 宁蒗| 遂川县| 武邑县| 孝昌县| 周至县| 监利县| 隆安县| 石家庄市| 休宁县| 资阳市| 怀来县| 武夷山市| 那坡县| 天全县| 丹巴县| 雅安市| 綦江县| 云南省| 留坝县| 津南区| 缙云县| 昌黎县| 舟曲县| 河源市| 抚州市| 遵化市| 峡江县| 晋州市| 江永县| 贵德县| 和顺县| 安化县|