久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

KAFKA是如何處理粘包拆包的

165次閱讀
沒有評論

共計 4692 個字符,預計需要花費 12 分鐘才能閱讀完成。

本篇內(nèi)容主要講解“KAFKA 是如何處理粘包拆包的”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“KAFKA 是如何處理粘包拆包的”吧!

一、為什么會出現(xiàn)粘包拆包現(xiàn)象?

我們知道,TCP 數(shù)據(jù)包都是按照協(xié)議進行拆包、編號然后分批發(fā)送的;
那么對應我們應用層有意義的數(shù)據(jù)包,傳輸層的協(xié)議并不了解其含義,更不會去根據(jù)你的業(yè)務內(nèi)容去分包和發(fā)送,只會按照自己的協(xié)議棧去進行數(shù)據(jù)發(fā)送。
因此,就出現(xiàn)了網(wǎng)絡數(shù)據(jù)的粘包,拆包問題。
究其本質(zhì),其實就是傳輸層并不了解上層應用的數(shù)據(jù)含義,只會按照協(xié)議棧進行數(shù)據(jù)發(fā)送。

二、通常有哪些解決粘包拆包問題的方法?

在了解出現(xiàn)這個問題的本質(zhì)后,那么要想解決這個問題就很簡單了。
不就是在進行數(shù)據(jù)接收的時候,我們應用層收到數(shù)據(jù)后根據(jù)標識判斷一下,數(shù)據(jù)是否完整,如果完整了我們再進行數(shù)據(jù)包解析,最后交給業(yè)務代碼不就好了?
通常解決粘包拆包的問題有三種方案:

定長,例如我保證我每一條數(shù)據(jù)都是 200b,那么我每接收到 200b 就認為是一條完整的數(shù)據(jù),接著就可以進行解析,并向業(yè)務代碼交付。

分隔符,一樣的意思,我每條數(shù)據(jù)末尾都用一個分隔符例如換行符,制表符這種來標識這條數(shù)據(jù)寫完了,那么我們收到數(shù)據(jù)判找一下這個分割符在哪兒,最后進行切割就可以得到完整的數(shù)據(jù)包了。

自定義協(xié)議,這個也很簡單,就是定義一個你的完整數(shù)據(jù)包的內(nèi)容格式是什么樣子的,例如 len + data,其中 len 是代表 data 的字節(jié)長度。這樣每次根據(jù)前面 4 個字節(jié)的 len,就能得到后面還需要多少數(shù)據(jù)才是一條完整的數(shù)據(jù),少了就等,多了就截取。

最后,可能很多不熟悉網(wǎng)絡編程的同學會納悶,那萬一 TCP 的數(shù)據(jù)包丟失了,亂序了,上面這種方法不就出問題了嘛?
其實不是的,TCP 一個可靠的消息傳輸協(xié)議,其協(xié)議的根本思想就是提供可靠的數(shù)據(jù)傳輸服務。
翻譯一下就是,你可以相信 TCP 傳輸?shù)臄?shù)據(jù)是可靠的,在交付給應用層數(shù)據(jù)的時候,是不會出現(xiàn)上述這種情況的。
出現(xiàn)這種情況只會在傳輸層出現(xiàn),而 TCP 協(xié)議也為對應的情況設計了分批、編號、去重、校驗和、超時重傳等一系列的操作,來保證數(shù)據(jù)可靠。

三、kakfa 是如何解決粘包拆包問題的呢?

最后,讓我們來看下 kafka 是如何解決粘包拆包問題的呢?是以上面提到的哪種方式來解決的呢?
首先看粘包,也就是接收到了多余的數(shù)據(jù),該如何拆分數(shù)據(jù)包,讀取到正確完整的數(shù)據(jù)包?
如下面代碼所示,分為三個階段:

先讀取前 4 字節(jié),轉(zhuǎn)換為一個 int,即長度。

根據(jù)長度申請內(nèi)存 buffer。

最后讀取指定大小的數(shù)據(jù)到申請好的 buffer

由此,就完整了一整條數(shù)據(jù)的正確讀取。整個過程其實就是上面提到的 len+data 這么一個簡單的自定義協(xié)議。

public NetworkReceive read() throws IOException { NetworkReceive result = null; //  新建一個 receive if (receive == null) {receive = new NetworkReceive(maxReceiveSize, id, memoryPool); } //  真正的數(shù)據(jù) read receive(receive); //  數(shù)據(jù)讀取完成的后置操作  if (receive.complete()) { //  倒帶,等待讀 receive.payload().rewind(); //  直接引用賦值  result = receive; //  最后清空當前引用,然后等待下次進入 read 的時候,執(zhí)行 new  操作  receive = null; } else if (receive.requiredMemoryAmountKnown()   !receive.memoryAllocated()   isInMutableState()) {//pool must be out of memory, mute ourselves. mute(); }return result;}
public long readFrom(ScatteringByteChannel channel) throws IOException {int read = 0;
 //  存在數(shù)據(jù) if (size.hasRemaining()) { // len + dataint bytesRead = channel.read(size); if (bytesRead   0)throw new EOFException(); read += bytesRead;
 //  如果讀滿了長度,則直接倒帶得到具體的 len 值
 //  這里的 size 是一個 byteBuffer 類型的,也就是接收到的數(shù)據(jù)  if (!size.hasRemaining()) {size.rewind(); int receiveSize = size.getInt(); if (receiveSize   0)throw new InvalidReceiveException(Invalid receive (size =   + receiveSize + )  if (maxSize != UNLIMITED   receiveSize   maxSize)throw new InvalidReceiveException(Invalid receive (size =   + receiveSize +   larger than   + maxSize + )  requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL) if (receiveSize == 0) {buffer = EMPTY_BUFFER; }
 }
 } //  如果長度已經(jīng)就緒了,那么就需要接下來的 data 需要多少空間,在這里進行申請 if (buffer == null   requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet buffer = memoryPool.tryAllocate(requestedBufferSize); if (buffer == null)log.trace(Broker low on memory - could not allocate buffer of size {} for source {} , requestedBufferSize, source); }
 //  申請完畢之后,就調(diào)用 read 函數(shù),直接 read 出來即可。if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead   0)throw new EOFException(); read += bytesRead; } //  返回讀取的總字節(jié)數(shù) return read;}

再先看拆包,也就是接收到數(shù)據(jù)不夠組成一條完整的數(shù)據(jù),該如何等待完整的數(shù)據(jù)包?
下面代碼最核心的就是 receive.complete() 函數(shù)的判斷邏輯,這個判斷的三個條件分別意味著:

!size.hasRemaining():接收到的 buffer 數(shù)據(jù)已經(jīng)讀取完成。

buffer != null:buffer 已經(jīng)創(chuàng)建。

!buffer.hasRemaining():buffer 已經(jīng)讀取完成。

翻譯一下,其實就是只要一條數(shù)據(jù)沒讀完整,那么 receive.complete() 函數(shù)返回值就是 false,那么最終返回的結(jié)果就是 null,等待下一次 OP_READ 事件的時候再接著上次沒讀完的數(shù)據(jù)讀取,直到讀取一條完整的數(shù)據(jù)為止。

public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) {receive = new NetworkReceive(maxReceiveSize, id, memoryPool); }
 receive(receive); if (receive.complete()) {receive.payload().rewind(); result = receive; receive = null; } else if (receive.requiredMemoryAmountKnown()   !receive.memoryAllocated()   isInMutableState()) {//pool must be out of memory, mute ourselves. mute(); }return result;}
public boolean complete() { return !size.hasRemaining()   buffer != null   !buffer.hasRemaining();}

最后,我們再補充一點,當我們一次性收到很多條數(shù)據(jù)的時候,會如何處理呢?
下面的源碼告訴了我們答案,就是一次性全部讀取出來,然后存入 stageReceives 這個數(shù)據(jù)結(jié)構(gòu)中等待下一步業(yè)務處理。

private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {//if channel is ready and has bytes to read from socket or buffer, and has no //previous receive(s) already staged or otherwise in progress then read from it if (channel.ready()   (key.isReadable() || channel.hasBytesBuffered())   !hasStagedReceive(channel)
   !explicitlyMutedChannels.contains(channel)) { NetworkReceive networkReceive; //  一次性讀取所有的 receives,暫存到 stageReceives 中  while ((networkReceive = channel.read()) != null) { madeReadProgressLastPoll = true; addToStagedReceives(channel, networkReceive); }// isMute 是判斷當前 channel 是否關注了 OP_READ 事件  if (channel.isMute()) {outOfMemory = true; //channel has muted itself due to memory pressure. } else {madeReadProgressLastPoll = true; }
 }
}

到此,相信大家對“KAFKA 是如何處理粘包拆包的”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網(wǎng)站,更多相關內(nèi)容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-25發(fā)表,共計4692字。
轉(zhuǎn)載說明:除特殊說明外本站除技術相關以外文章皆由網(wǎng)絡搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 齐河县| 宁晋县| 舒城县| 土默特左旗| 海伦市| 鄂伦春自治旗| 临西县| 灵宝市| 那曲县| 布尔津县| 朝阳区| 丰都县| 邹平县| 石河子市| 黎城县| 兴文县| 天等县| 高邮市| 永宁县| 河南省| 白城市| 西和县| 通辽市| 镇远县| 卢龙县| 射阳县| 武平县| 石家庄市| 桂林市| 绥棱县| 黄大仙区| 巴东县| 铜鼓县| 绥中县| 香港 | 曲沃县| 高台县| 太白县| 固阳县| 团风县| 高阳县|