久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

kafka consumer怎么使用

154次閱讀
沒有評論

共計 4495 個字符,預計需要花費 12 分鐘才能閱讀完成。

這篇文章主要介紹“kafka consumer 怎么使用”,在日常操作中,相信很多人在 kafka consumer 怎么使用問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”kafka consumer 怎么使用”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

consumer 作為 kafka 當中一個重要元素,它的常用操作并不復雜,說白了無非就是 2 點,1、把數據 poll 出來,2、把位置標記上。我們找到 kafka 的 java api doc,找到了官方提供的幾種 consumer 操作的例子,逐一進行分析,看看都有幾種操作類型。

Automatic Offset Committing

自動 Offset 提交

這個例子顯示了一個基于 offset 自動提交的 consumer api 的簡單應用。

Properties props = new Properties();
 props.put( bootstrap.servers ,  localhost:9092 
 props.put( group.id ,  test 
 props.put( enable.auto.commit ,  true 
 props.put( auto.commit.interval.ms ,  1000 
 props.put( session.timeout.ms ,  30000 
 props.put( key.deserializer ,  org.apache.kafka.common.serialization.StringDeserializer 
 props.put( value.deserializer ,  org.apache.kafka.common.serialization.StringDeserializer 
 KafkaConsumer String, String  consumer = new KafkaConsumer (props);
 consumer.subscribe(Arrays.asList( foo ,  bar));
 while (true) { ConsumerRecords String, String  records = consumer.poll(100);
 for (ConsumerRecord String, String  record : records)
 System.out.printf(offset = %d, key = %s, value = %s , record.offset(), record.key(), record.value());
 }

enable.auto.commit 意味著 offset 將會得到自動提交,而這個自動提交的時間間隔由 auto.commit.interval.ms 來進行控制。

客戶端通過 bootstrap.servers 的配置來連接服務器,這個配值當中可以是一個或多個 broker,需要注意的是,這個配置僅僅用來讓客戶端找到我們的 server 集群,而不需要把集群當中的所有服務器地址都列上。

在這個例子當中,客戶端作為 test group 的一員,訂閱了 foo 和 bar2 個 topic。

(這一段直接翻譯很蹩腳,我會試著根據自己的理解翻譯出來)首先假設,foo 和 bar 這 2 個 topic,都分別有 3 個 partitions,同時我們將上面的代碼在我們的機器上起 3 個進程,也就是說,在 test group 當中,目前有了 3 個 consumer,一般來講,這 3 個 consumer 會分別獲得 foo 和 bar 的各一個 partitions,這是前提。3 個 consumer 會周期性的執行一個 poll 的動作(這個動作當中隱含的有一個 heartbeat 的發送,來告訴 cluster 我是活的),這樣 3 個 consumer 會持續的保有他們對分配給自己的 partition 的訪問的權利,如果某一個 consumer 失效了,也就是 poll 不再執行了,cluster 會在一段時間(session.timeout.ms)之后把 partitions 分配給其他的 consumer。

反序列化的設置,定義了如何轉化 bytes,這里我們把 key 和 value 都直接轉化為 string。

Manual Offset Control

手動的 offset 控制

除了周期性的自動提交 offset 之外,用戶也可以在消息被消費了之后提交他們的 offset。

某些情況下,消息的消費是和某些處理邏輯相關聯的,我們可以用這樣的方式,手動的在處理邏輯結束之后提交 offset。

簡要地說,在這個例子當中,我們希望每次至少消費 200 條消息并將它們插入數據庫,之后再提交 offset。如果仍然使用前面的自動提交方式,就可能出現消息已經被消費,但是插入數據庫失敗的情況。這里可以視作一個簡單的事務封裝。

但是,有沒有另一種可能性,在插入數據庫成功之后,提交 offset 之前,發生了錯誤,或者說是提交 offset 本身發生了錯誤,那么就可能出現某些消息被重復消費的情況。

個人認為這段話說的莫名其妙,簡單地說,采用這樣的方式,消息不會被丟失,但是有可能出現重復消費。

Properties props = new Properties();
 props.put( bootstrap.servers ,  localhost:9092 
 props.put( group.id ,  test 
 props.put( enable.auto.commit ,  false 
 props.put( auto.commit.interval.ms ,  1000 
 props.put( session.timeout.ms ,  30000 
 props.put( key.deserializer ,  org.apache.kafka.common.serialization.StringDeserializer 
 props.put( value.deserializer ,  org.apache.kafka.common.serialization.StringDeserializer 
 KafkaConsumer String, String  consumer = new KafkaConsumer (props);
 consumer.subscribe(Arrays.asList( foo ,  bar));
 final int minBatchSize = 200;
 List ConsumerRecord String, String  buffer = new ArrayList ();
 while (true) { ConsumerRecords String, String  records = consumer.poll(100);
 for (ConsumerRecord String, String  record : records) { buffer.add(record);
 }
 if (buffer.size()  = minBatchSize) { insertIntoDb(buffer);
 consumer.commitSync();
 buffer.clear();
 }
 }

上面的例子當中,我們用 commitSync 來標記所有的消息;在有些情況下,我們可能希望更加精確的控制 offset,那么在下面的例子當中,我們可以在每一個 partition 當中分別控制 offset 的提交。

try { while(running) { ConsumerRecords String, String  records = consumer.poll(Long.MAX_VALUE);
 for (TopicPartition partition : records.partitions()) { List ConsumerRecord String, String  partitionRecords = records.records(partition);
 for (ConsumerRecord String, String  record : partitionRecords) { System.out.println(record.offset() +  :   + record.value());
 }
 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
 }
 }
 } finally { consumer.close();
 }

注意:提交的 offset 應該是 next message,所以,提交的時候需要在當前最后一條的基礎上 +1.

Manual Partition Assignment

手動的分區分配

前面的例子當中,我們訂閱一個 topic,然后讓 kafka 把該 topic 當中的不同 partitions,公平的在一個 consumer group 內部進行分配。那么,在某些情況下,我們希望能夠具體的指定 partitions 的分配關系。

如果某個進程在本地管理了和 partition 相關的狀態,那么它只需要獲得跟他相關 partition。

如果某個進程自身具備高可用性,那么就不需要 kafka 來檢測錯誤并重新分配 partition,因為消費者進程會在另一臺設備上重新啟動。

要使用這種模式,可以用 assign 方法來代替 subscribe,具體指定一個 partitions 列表。

String topic =  foo 
 TopicPartition partition0 = new TopicPartition(topic, 0);
 TopicPartition partition1 = new TopicPartition(topic, 1);
 consumer.assign(Arrays.asList(partition0, partition1));

分配之后,就可以像前面的例子一樣,在循環當中調用 poll 來消費消息。手動的分區分配不需要組協調,所以消費進程失效之后,不會引發 partition 的重新分配,每一個消費者都是獨立工作的,即使它和其他消費者屬于同一個 group。為了避免 offset 提交的沖突,在這種情況下,通常我們需要保證每一個 consumer 使用自己的 group id。

需要注意的是,手動 partition 分配和通過 subscribe 實現的動態的分區分配,2 種方式是不能混合使用的。

到此,關于“kafka consumer 怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計4495字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 克山县| 临泽县| 孟州市| 施秉县| 仙游县| 英山县| 兴安县| 来宾市| 成都市| 梁山县| 开原市| 景宁| 广水市| 辽中县| 霍城县| 潜山县| 绍兴县| 贵州省| 阳原县| 茂名市| 旅游| 巫山县| 云阳县| 温泉县| 新乡县| 昌都县| 长武县| 石嘴山市| 凤阳县| 交城县| 冀州市| 大同县| 宁夏| 通州市| 台前县| 滦平县| 思南县| 桐柏县| 乃东县| 抚顺市| 洛隆县|