共計(jì) 7120 個(gè)字符,預(yù)計(jì)需要花費(fèi) 18 分鐘才能閱讀完成。
本篇內(nèi)容介紹了“KAFKA 是如何處理延時(shí)任務(wù)的”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓丸趣 TV 小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
一、kafka 服務(wù)端大概有哪些延時(shí)任務(wù)?
首先,我們需要了解一下 kafka 中大概有哪些需要延時(shí)的任務(wù),該怎么查看呢?
很簡(jiǎn)單,kafka 的設(shè)計(jì)都是基于接口的,那么我們只需要找到延時(shí)任務(wù)的頂層接口,然后看一下該接口有哪些實(shí)現(xiàn)類就知道有哪些延時(shí)任務(wù)了。
頂層抽象類接口是:DelayedOperation
對(duì)應(yīng)的子類:
DelayedHeartbeat:就是用于做消費(fèi)者心跳超時(shí)檢測(cè)的;
DelayedProduce:就是做生產(chǎn)者設(shè)置 ack=- 1 時(shí)需要等待所有副本確認(rèn)寫入成功的;
DelayedFetch:就是在消費(fèi)的時(shí)候該分區(qū)沒有數(shù)據(jù),需要去做延時(shí)等待;
DelayedJoin:就是去做消費(fèi)者加組的時(shí)候,在 JOIN 階段需要延時(shí)等待。
二、kafka 里面的延時(shí)任務(wù)是如何實(shí)現(xiàn)的呢?
這個(gè)答案已經(jīng)在標(biāo)題中就已經(jīng)回答了,就是時(shí)間輪。
那么時(shí)間輪在 kafka 中是如何實(shí)現(xiàn)的呢?
kafka 中的時(shí)間輪本體是一個(gè) 20 長(zhǎng)度數(shù)組,不過內(nèi)部持有上層數(shù)組的一個(gè)引用,數(shù)組中每個(gè)元素都是一個(gè) List,存放處于這個(gè)時(shí)間段的所有任務(wù)。
最后將這些有任務(wù)的 List 引用,放入 DelayQueue 來實(shí)現(xiàn)時(shí)間的流動(dòng),每次從 DelayQueue 中取出到期的 List 進(jìn)行對(duì)應(yīng)的操作。
翻譯一下:
就是原本把所有延時(shí)任務(wù)都一股腦全部放入 DelayQueue 中,實(shí)在是太多了,由于 DelayQueue 底層數(shù)據(jù)結(jié)構(gòu)是小頂堆,插入和刪除的時(shí)間復(fù)雜度都是 O(nlog(n)),
n 代表的具體任務(wù)的數(shù)量,當(dāng) n 值非常大時(shí),對(duì)應(yīng)的性能就很差,不能滿足一個(gè)高性能中間件的要求。于是就想了個(gè)辦法減小 n 的個(gè)數(shù),
就是把原來的一個(gè)個(gè)延時(shí)任務(wù),通過時(shí)間區(qū)間來封裝成一個(gè) List,把 List 作為一個(gè)基本單位存入到 DelayQueue 中,那么這一樣一來,就能把插入和刪除的時(shí)間復(fù)雜度
從 O(nlog(n)) 降低到接近 O(1)[這里為什么是近似 O(1)呢?你可以理解為時(shí)間輪是一個(gè)類 hash 表的結(jié)構(gòu)],除此之外,最重要的就是大大減小了 DelayQueue 中元素的個(gè)數(shù) n,
因?yàn)橐粚訒r(shí)間輪就 20 個(gè) List,10 層也就才 200 個(gè),所以對(duì)于這么小數(shù)量的元素個(gè)數(shù),DelayQueue 是完全能 hold 的住的。
總結(jié)一下:
其實(shí)時(shí)間輪的設(shè)計(jì)思想就是批處理的思想,把一批任務(wù)根據(jù)時(shí)間區(qū)間封裝成一個(gè) List,最后把 List 放到 DelayQueue 中去實(shí)現(xiàn)輪轉(zhuǎn)的效果。
優(yōu)化點(diǎn)主要是兩個(gè),一個(gè)是插入 / 刪除的時(shí)間復(fù)雜度由 O(nlog(n))降低到了近似 O(1),第二個(gè)是大大減小了 DelayQueue 元素的個(gè)數(shù)。
了解設(shè)計(jì)思想,我們?cè)倏纯磳?shí)現(xiàn)原理:
1、核心函數(shù):加入 Task 到時(shí)間輪中
分為三步:
如果任務(wù)已經(jīng)超期就返回 false
如果任務(wù)在自己的時(shí)間跨度內(nèi),就計(jì)算應(yīng)該放入哪個(gè)桶中(在哪個(gè)時(shí)間區(qū)間);如果桶沒在 DelayQueue 中則加入到 DelayQueue 中去。
如果任務(wù)的超時(shí)時(shí)間超過了自己的時(shí)間跨度,就往上層時(shí)間傳,直到找到一個(gè)滿足時(shí)間跨度的時(shí)間輪。
def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { // 被取消 // Cancelled false } else if (expiration currentTime + tickMs) { // 已經(jīng)過期 // Already expired false } else if (expiration currentTime + interval) { // 在有效期內(nèi) // Put in its own bucket val virtualId = expiration / tickMsval bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)// Set the bucket expiration time // 設(shè)置超時(shí)時(shí)間,如果該桶已經(jīng)設(shè)置了超時(shí)時(shí)間則說明已經(jīng)存在于 DelayQueue 中了 // 如果不存在超時(shí)時(shí)間,則需要將當(dāng)前桶加入 DelayQueue 中 if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle // will pass in the same value and hence return false, thus the bucket with the same expiration will not // be enqueued multiple times. queue.offer(bucket) }true } else { // 超過了當(dāng)前層時(shí)間輪的時(shí)間跨度 需要向上層時(shí)間輪傳遞,如果上層不存在則新建 // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)
}}
2、時(shí)間輪如何推進(jìn)?
每一個(gè) DelayedOperationPurgatory,都有一個(gè)線程 expirationReaper,去負(fù)責(zé)推進(jìn)時(shí)間輪,如果當(dāng)前沒有 task 到期就掛起 200ms 等待。
如果有 task 到期,就取出對(duì)應(yīng)的桶,然后將桶中的數(shù)據(jù)全都執(zhí)行 reinsert,也就是從最底層的時(shí)間輪重新執(zhí)行一遍 add 操作。
/** * A background reaper to expire delayed operations that have timed out */private class ExpiredOperationReaper extends ShutdownableThread( ExpirationReaper-%d-%s .format(brokerId, purgatoryName), false) { override def doWork() { advanceClock(200L) }
}
def advanceClock(timeoutMs: Long): Boolean = { // 從延時(shí)隊(duì)列中取出到期的桶 var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {writeLock.lock()try { // 一次性把到期的全部取出來 while (bucket != null) { // 時(shí)間輪的時(shí)間推進(jìn) timingWheel.advanceClock(bucket.getExpiration())// 把桶中的所有數(shù)據(jù)都拿去執(zhí)行 reinsert 函數(shù) // 本質(zhì)就是去執(zhí)行 addTimerTaskEntry(timerTaskEntry) bucket.flush(reinsert)bucket = delayQueue.poll()
}
} finally { writeLock.unlock()
}true } else {false }
}
3、到期的任務(wù)如何執(zhí)行?
其實(shí)就是接著上面的源碼,當(dāng)任務(wù)到期之后,reinsert 函數(shù)會(huì)返回 false,代表已經(jīng)超期 / 被取消了,每個(gè) DelayedOperationPurgatory 又有一個(gè)單線程的 taskExecutor,
超期的任務(wù)就提交到線程池中去執(zhí)行即可。
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) = addTimerTaskEntry(timerTaskEntry)
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { // 如果時(shí)間輪添加返回 false 則說明超期 / 被取消了,直接提交到自己的單線程線程池中去執(zhí)行該 task if (!timingWheel.add(timerTaskEntry)) {// Already expired or cancelled if (!timerTaskEntry.cancelled) taskExecutor.submit(timerTaskEntry.timerTask) }
}
4、整個(gè)流程的運(yùn)行圖
整個(gè)流程概括下來,就是業(yè)務(wù)代碼想 TimingWheel 執(zhí)行 add,提交任務(wù);
TimingWheel 找到合適的時(shí)間輪后插入對(duì)應(yīng)的桶中,并將桶放入 DelayQueue 中;
DelayedOperationPurgatory 組件中存在收割線程,去不停從 DelayQueue 中 poll 對(duì)應(yīng)到期的 task;
最后 task 重新執(zhí)行 reinsert,如果超期了就提交到 taskExecutor 中去執(zhí)行對(duì)應(yīng)的業(yè)務(wù) handler 邏輯。
三、相比于時(shí)間輪,為什么不采用 DelayQueue 來實(shí)現(xiàn)延時(shí)任務(wù)呢?
這個(gè)答案在第二小節(jié)的時(shí)候其實(shí)已經(jīng)給出的,在這里進(jìn)行一個(gè)總結(jié):
1、DelayQueue 底層數(shù)據(jù)結(jié)構(gòu)是小頂堆,插入和刪除的時(shí)間復(fù)雜度都是 O(nlog(n)),因此面對(duì)大量的延時(shí)操作時(shí),該結(jié)構(gòu)無法滿足 kafka 高性能的要求。
2、時(shí)間輪采用批處理的思想將任務(wù)按照區(qū)間進(jìn)行封裝,形成一類類似 hash 表的結(jié)構(gòu),讓插入 / 刪除的時(shí)間復(fù)雜度降低為 O(1),并且大大減小了 DelayQueue 元素的個(gè)數(shù)。
另外補(bǔ)充一點(diǎn),kafka 中每一種延時(shí)場(chǎng)景都會(huì)創(chuàng)建單獨(dú)的時(shí)間輪,一個(gè)時(shí)間輪里只存放一種類型的延時(shí)任務(wù),因?yàn)椴煌?Task 在超期 / 完成的時(shí)候需要執(zhí)行的邏輯是不一樣的,
需要一一對(duì)應(yīng)去執(zhí)行。舉個(gè)栗子,心跳延時(shí)場(chǎng)景有自己的 heartbeatPurgatory,生產(chǎn)延時(shí)有自己的 delayedProducePurgatory,以此類推。
四、延時(shí)案例分析 —— 消費(fèi)者心跳的維護(hù)
1、HEARTBEAT 請(qǐng)求的處理
從源碼中我們可以知道,心跳的維護(hù)和會(huì)話的超時(shí),kafka 的實(shí)現(xiàn)非常巧妙。
通常情況下,心跳 3s 發(fā)一次,session 超時(shí)時(shí)間是 10s;
kafka 就在收到 HEARTBEAT 請(qǐng)求之后,就先創(chuàng)建一個(gè) DelayedHeartbeat 延時(shí)任務(wù),超時(shí)時(shí)間就是對(duì)應(yīng)的 session.timeout 值即 10s;
如果在 10s 內(nèi)又收到了對(duì)應(yīng) consumer 的 HEARTBEAT 請(qǐng)求,就將上次提交的延時(shí)任務(wù)完成;
如果在 10s 內(nèi)沒有收到對(duì)應(yīng) consumer 的 HEARTBEAT 請(qǐng)求,則任務(wù) consumer 出問題了,就去執(zhí)行對(duì)應(yīng)的超期邏輯。
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) { // complete current heartbeat expectation member.latestHeartbeat = time.milliseconds() val memberKey = MemberKey(member.groupId, member.memberId) // 完成上次的延時(shí)任務(wù) heartbeatPurgatory.checkAndComplete(memberKey) // reschedule the next heartbeat expiration deadline // 服務(wù)端能拿到這個(gè) session.timeout, 然后根據(jù)這個(gè)時(shí)間生成一個(gè)延時(shí)任務(wù), // 例如 30s,如果這么長(zhǎng)時(shí)間么有收到心跳請(qǐng)求,則認(rèn)為消費(fèi)者出了問題,就踢掉以后執(zhí)行 rebalance。 val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, sessionTimeout: Long) extends DelayedOperation(sessionTimeout, Some(group.lock)) { override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _) override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) override def onComplete() = coordinator.onCompleteHeartbeat()
}
2、DelayedHeartbeat 任務(wù)超期后的邏輯
這一塊也很簡(jiǎn)單,就是去執(zhí)行 coordinator.onExpireHeartbeat 函數(shù),
具體邏輯就是打印一個(gè)標(biāo)識(shí)日志:Member xxx has failed,這個(gè)日志為什么要單獨(dú)講呢?因?yàn)檫@個(gè)是我們排查消費(fèi)者問題的時(shí)候的核心日志;
我們?cè)诳?server.log 的時(shí)候,如果查到某個(gè)消費(fèi)組的消費(fèi)者出現(xiàn)這個(gè)日志,那么我們就能肯定這個(gè)消費(fèi)組的這個(gè)消費(fèi)者是因?yàn)闀?huì)話超時(shí)的原因被剔除了;
從而我們就可以繼續(xù)往下分析這個(gè)消費(fèi)者掉線的原因是因?yàn)橄M(fèi)者進(jìn)程掛了?或者是客戶端機(jī)器負(fù)載太高而心跳線程是守護(hù)線程優(yōu)先級(jí)比較低拿不到 CPU 資源?
等等一系列定位線索。這個(gè)日志主要是用于定位消費(fèi)者出問題,以及消費(fèi)組 rebalance 原因的,是非常重要的一個(gè)標(biāo)識(shí)日志!
講完日志,我們就可以看到后續(xù)就是去開啟 rebalance,因?yàn)橄M(fèi)者個(gè)數(shù)變了,需要重新去進(jìn)行分區(qū)分配,已經(jīng)故障轉(zhuǎn)移。
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { group.inLock {if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { // 標(biāo)識(shí)日志 info(s Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group )
removeMemberAndUpdateGroup(group, member)}
}
}
private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata) { group.remove(member.memberId)
group.currentState match {case Dead | Empty = case Stable | CompletingRebalance = maybePrepareRebalance(group) case PreparingRebalance = joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
“KAFKA 是如何處理延時(shí)任務(wù)的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注丸趣 TV 網(wǎng)站,丸趣 TV 小編將為大家輸出更多高質(zhì)量的實(shí)用文章!