久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

KAFKA是如何處理延時(shí)任務(wù)的

共計(jì) 7120 個(gè)字符,預(yù)計(jì)需要花費(fèi) 18 分鐘才能閱讀完成。

本篇內(nèi)容介紹了“KAFKA 是如何處理延時(shí)任務(wù)的”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓丸趣 TV 小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

一、kafka 服務(wù)端大概有哪些延時(shí)任務(wù)?

首先,我們需要了解一下 kafka 中大概有哪些需要延時(shí)的任務(wù),該怎么查看呢?
很簡(jiǎn)單,kafka 的設(shè)計(jì)都是基于接口的,那么我們只需要找到延時(shí)任務(wù)的頂層接口,然后看一下該接口有哪些實(shí)現(xiàn)類就知道有哪些延時(shí)任務(wù)了。
頂層抽象類接口是:DelayedOperation
對(duì)應(yīng)的子類:

DelayedHeartbeat:就是用于做消費(fèi)者心跳超時(shí)檢測(cè)的;
DelayedProduce:就是做生產(chǎn)者設(shè)置 ack=- 1 時(shí)需要等待所有副本確認(rèn)寫入成功的;
DelayedFetch:就是在消費(fèi)的時(shí)候該分區(qū)沒有數(shù)據(jù),需要去做延時(shí)等待;
DelayedJoin:就是去做消費(fèi)者加組的時(shí)候,在 JOIN 階段需要延時(shí)等待。

二、kafka 里面的延時(shí)任務(wù)是如何實(shí)現(xiàn)的呢?

這個(gè)答案已經(jīng)在標(biāo)題中就已經(jīng)回答了,就是時(shí)間輪。
那么時(shí)間輪在 kafka 中是如何實(shí)現(xiàn)的呢?

kafka 中的時(shí)間輪本體是一個(gè) 20 長(zhǎng)度數(shù)組,不過內(nèi)部持有上層數(shù)組的一個(gè)引用,數(shù)組中每個(gè)元素都是一個(gè) List,存放處于這個(gè)時(shí)間段的所有任務(wù)。
最后將這些有任務(wù)的 List 引用,放入 DelayQueue 來實(shí)現(xiàn)時(shí)間的流動(dòng),每次從 DelayQueue 中取出到期的 List 進(jìn)行對(duì)應(yīng)的操作。
翻譯一下:
就是原本把所有延時(shí)任務(wù)都一股腦全部放入 DelayQueue 中,實(shí)在是太多了,由于 DelayQueue 底層數(shù)據(jù)結(jié)構(gòu)是小頂堆,插入和刪除的時(shí)間復(fù)雜度都是 O(nlog(n)),
n 代表的具體任務(wù)的數(shù)量,當(dāng) n 值非常大時(shí),對(duì)應(yīng)的性能就很差,不能滿足一個(gè)高性能中間件的要求。于是就想了個(gè)辦法減小 n 的個(gè)數(shù),
就是把原來的一個(gè)個(gè)延時(shí)任務(wù),通過時(shí)間區(qū)間來封裝成一個(gè) List,把 List 作為一個(gè)基本單位存入到 DelayQueue 中,那么這一樣一來,就能把插入和刪除的時(shí)間復(fù)雜度
從 O(nlog(n)) 降低到接近 O(1)[這里為什么是近似 O(1)呢?你可以理解為時(shí)間輪是一個(gè)類 hash 表的結(jié)構(gòu)],除此之外,最重要的就是大大減小了 DelayQueue 中元素的個(gè)數(shù) n,
因?yàn)橐粚訒r(shí)間輪就 20 個(gè) List,10 層也就才 200 個(gè),所以對(duì)于這么小數(shù)量的元素個(gè)數(shù),DelayQueue 是完全能 hold 的住的。
總結(jié)一下:
其實(shí)時(shí)間輪的設(shè)計(jì)思想就是批處理的思想,把一批任務(wù)根據(jù)時(shí)間區(qū)間封裝成一個(gè) List,最后把 List 放到 DelayQueue 中去實(shí)現(xiàn)輪轉(zhuǎn)的效果。
優(yōu)化點(diǎn)主要是兩個(gè),一個(gè)是插入 / 刪除的時(shí)間復(fù)雜度由 O(nlog(n))降低到了近似 O(1),第二個(gè)是大大減小了 DelayQueue 元素的個(gè)數(shù)。

了解設(shè)計(jì)思想,我們?cè)倏纯磳?shí)現(xiàn)原理:
 

1、核心函數(shù):加入 Task 到時(shí)間輪中
分為三步:

如果任務(wù)已經(jīng)超期就返回 false

如果任務(wù)在自己的時(shí)間跨度內(nèi),就計(jì)算應(yīng)該放入哪個(gè)桶中(在哪個(gè)時(shí)間區(qū)間);如果桶沒在 DelayQueue 中則加入到 DelayQueue 中去。

如果任務(wù)的超時(shí)時(shí)間超過了自己的時(shí)間跨度,就往上層時(shí)間傳,直到找到一個(gè)滿足時(shí)間跨度的時(shí)間輪。

def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { //  被取消  // Cancelled false } else if (expiration   currentTime + tickMs) { //  已經(jīng)過期  // Already expired false } else if (expiration   currentTime + interval) { //  在有效期內(nèi)  // Put in its own bucket val virtualId = expiration / tickMsval bucket = buckets((virtualId % wheelSize.toLong).toInt)
 bucket.add(timerTaskEntry)// Set the bucket expiration time //  設(shè)置超時(shí)時(shí)間,如果該桶已經(jīng)設(shè)置了超時(shí)時(shí)間則說明已經(jīng)存在于 DelayQueue 中了  //  如果不存在超時(shí)時(shí)間,則需要將當(dāng)前桶加入 DelayQueue 中  if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle // will pass in the same value and hence return false, thus the bucket with the same expiration will not // be enqueued multiple times. queue.offer(bucket) }true } else { //  超過了當(dāng)前層時(shí)間輪的時(shí)間跨度   需要向上層時(shí)間輪傳遞,如果上層不存在則新建  // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)
 }}

2、時(shí)間輪如何推進(jìn)?
每一個(gè) DelayedOperationPurgatory,都有一個(gè)線程 expirationReaper,去負(fù)責(zé)推進(jìn)時(shí)間輪,如果當(dāng)前沒有 task 到期就掛起 200ms 等待。
如果有 task 到期,就取出對(duì)應(yīng)的桶,然后將桶中的數(shù)據(jù)全都執(zhí)行 reinsert,也就是從最底層的時(shí)間輪重新執(zhí)行一遍 add 操作。

/** * A background reaper to expire delayed operations that have timed out */private class ExpiredOperationReaper extends ShutdownableThread(  ExpirationReaper-%d-%s .format(brokerId, purgatoryName), false) { override def doWork() { advanceClock(200L) }
}
def advanceClock(timeoutMs: Long): Boolean = { //  從延時(shí)隊(duì)列中取出到期的桶  var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
 if (bucket != null) {writeLock.lock()try { //  一次性把到期的全部取出來  while (bucket != null) { //  時(shí)間輪的時(shí)間推進(jìn)  timingWheel.advanceClock(bucket.getExpiration())//  把桶中的所有數(shù)據(jù)都拿去執(zhí)行 reinsert 函數(shù)  //  本質(zhì)就是去執(zhí)行 addTimerTaskEntry(timerTaskEntry) bucket.flush(reinsert)bucket = delayQueue.poll()
 }
 } finally { writeLock.unlock()
 }true } else {false }
}

3、到期的任務(wù)如何執(zhí)行?
其實(shí)就是接著上面的源碼,當(dāng)任務(wù)到期之后,reinsert 函數(shù)會(huì)返回 false,代表已經(jīng)超期 / 被取消了,每個(gè) DelayedOperationPurgatory 又有一個(gè)單線程的 taskExecutor,
超期的任務(wù)就提交到線程池中去執(zhí)行即可。

private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) =  addTimerTaskEntry(timerTaskEntry)
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { //  如果時(shí)間輪添加返回 false 則說明超期 / 被取消了,直接提交到自己的單線程線程池中去執(zhí)行該 task if (!timingWheel.add(timerTaskEntry)) {// Already expired or cancelled if (!timerTaskEntry.cancelled) taskExecutor.submit(timerTaskEntry.timerTask) }
}

4、整個(gè)流程的運(yùn)行圖
整個(gè)流程概括下來,就是業(yè)務(wù)代碼想 TimingWheel 執(zhí)行 add,提交任務(wù);
TimingWheel 找到合適的時(shí)間輪后插入對(duì)應(yīng)的桶中,并將桶放入 DelayQueue 中;
DelayedOperationPurgatory 組件中存在收割線程,去不停從 DelayQueue 中 poll 對(duì)應(yīng)到期的 task;
最后 task 重新執(zhí)行 reinsert,如果超期了就提交到 taskExecutor 中去執(zhí)行對(duì)應(yīng)的業(yè)務(wù) handler 邏輯。
 

三、相比于時(shí)間輪,為什么不采用 DelayQueue 來實(shí)現(xiàn)延時(shí)任務(wù)呢?

這個(gè)答案在第二小節(jié)的時(shí)候其實(shí)已經(jīng)給出的,在這里進(jìn)行一個(gè)總結(jié):
1、DelayQueue 底層數(shù)據(jù)結(jié)構(gòu)是小頂堆,插入和刪除的時(shí)間復(fù)雜度都是 O(nlog(n)),因此面對(duì)大量的延時(shí)操作時(shí),該結(jié)構(gòu)無法滿足 kafka 高性能的要求。
2、時(shí)間輪采用批處理的思想將任務(wù)按照區(qū)間進(jìn)行封裝,形成一類類似 hash 表的結(jié)構(gòu),讓插入 / 刪除的時(shí)間復(fù)雜度降低為 O(1),并且大大減小了 DelayQueue 元素的個(gè)數(shù)。

另外補(bǔ)充一點(diǎn),kafka 中每一種延時(shí)場(chǎng)景都會(huì)創(chuàng)建單獨(dú)的時(shí)間輪,一個(gè)時(shí)間輪里只存放一種類型的延時(shí)任務(wù),因?yàn)椴煌?Task 在超期 / 完成的時(shí)候需要執(zhí)行的邏輯是不一樣的,
需要一一對(duì)應(yīng)去執(zhí)行。舉個(gè)栗子,心跳延時(shí)場(chǎng)景有自己的 heartbeatPurgatory,生產(chǎn)延時(shí)有自己的 delayedProducePurgatory,以此類推。

四、延時(shí)案例分析 —— 消費(fèi)者心跳的維護(hù)

1、HEARTBEAT 請(qǐng)求的處理
從源碼中我們可以知道,心跳的維護(hù)和會(huì)話的超時(shí),kafka 的實(shí)現(xiàn)非常巧妙。
通常情況下,心跳 3s 發(fā)一次,session 超時(shí)時(shí)間是 10s;
kafka 就在收到 HEARTBEAT 請(qǐng)求之后,就先創(chuàng)建一個(gè) DelayedHeartbeat 延時(shí)任務(wù),超時(shí)時(shí)間就是對(duì)應(yīng)的 session.timeout 值即 10s;
如果在 10s 內(nèi)又收到了對(duì)應(yīng) consumer 的 HEARTBEAT 請(qǐng)求,就將上次提交的延時(shí)任務(wù)完成;
如果在 10s 內(nèi)沒有收到對(duì)應(yīng) consumer 的 HEARTBEAT 請(qǐng)求,則任務(wù) consumer 出問題了,就去執(zhí)行對(duì)應(yīng)的超期邏輯。

private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) { // complete current heartbeat expectation member.latestHeartbeat = time.milliseconds() val memberKey = MemberKey(member.groupId, member.memberId) //  完成上次的延時(shí)任務(wù)  heartbeatPurgatory.checkAndComplete(memberKey) // reschedule the next heartbeat expiration deadline //  服務(wù)端能拿到這個(gè) session.timeout, 然后根據(jù)這個(gè)時(shí)間生成一個(gè)延時(shí)任務(wù), //  例如 30s,如果這么長(zhǎng)時(shí)間么有收到心跳請(qǐng)求,則認(rèn)為消費(fèi)者出了問題,就踢掉以后執(zhí)行 rebalance。 val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
 heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, sessionTimeout: Long) extends DelayedOperation(sessionTimeout, Some(group.lock)) { override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _) override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) override def onComplete() = coordinator.onCompleteHeartbeat()
}

2、DelayedHeartbeat 任務(wù)超期后的邏輯
這一塊也很簡(jiǎn)單,就是去執(zhí)行 coordinator.onExpireHeartbeat 函數(shù),
具體邏輯就是打印一個(gè)標(biāo)識(shí)日志:Member  xxx has failed,這個(gè)日志為什么要單獨(dú)講呢?因?yàn)檫@個(gè)是我們排查消費(fèi)者問題的時(shí)候的核心日志;
我們?cè)诳?server.log 的時(shí)候,如果查到某個(gè)消費(fèi)組的消費(fèi)者出現(xiàn)這個(gè)日志,那么我們就能肯定這個(gè)消費(fèi)組的這個(gè)消費(fèi)者是因?yàn)闀?huì)話超時(shí)的原因被剔除了;
從而我們就可以繼續(xù)往下分析這個(gè)消費(fèi)者掉線的原因是因?yàn)橄M(fèi)者進(jìn)程掛了?或者是客戶端機(jī)器負(fù)載太高而心跳線程是守護(hù)線程優(yōu)先級(jí)比較低拿不到 CPU 資源?
等等一系列定位線索。這個(gè)日志主要是用于定位消費(fèi)者出問題,以及消費(fèi)組 rebalance 原因的,是非常重要的一個(gè)標(biāo)識(shí)日志!

講完日志,我們就可以看到后續(xù)就是去開啟 rebalance,因?yàn)橄M(fèi)者個(gè)數(shù)變了,需要重新去進(jìn)行分區(qū)分配,已經(jīng)故障轉(zhuǎn)移。

def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { group.inLock {if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { //  標(biāo)識(shí)日志  info(s Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group )
 removeMemberAndUpdateGroup(group, member)}
 }
}
private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata) { group.remove(member.memberId)
 group.currentState match {case Dead | Empty =  case Stable | CompletingRebalance =  maybePrepareRebalance(group) case PreparingRebalance =  joinPurgatory.checkAndComplete(GroupKey(group.groupId))
 }
}

“KAFKA 是如何處理延時(shí)任務(wù)的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注丸趣 TV 網(wǎng)站,丸趣 TV 小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-25發(fā)表,共計(jì)7120字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請(qǐng)注明出處。
評(píng)論(沒有評(píng)論)
主站蜘蛛池模板: 儋州市| 家居| 六安市| 金阳县| 丰都县| 湖北省| 堆龙德庆县| 昌吉市| 任丘市| 灵山县| 平潭县| 彩票| 大渡口区| 施秉县| 阜阳市| 达尔| 衡阳县| 井陉县| 新安县| 墨脱县| 阿拉善右旗| 南郑县| 平泉县| 阳东县| 新疆| 于都县| 桓台县| 荔浦县| 镇平县| 思茅市| 定安县| 南昌县| 营口市| 武定县| 南丰县| 六枝特区| 大新县| 集安市| 睢宁县| 桃园县| 乌鲁木齐县|