久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Redis中命令的原子性是什么

170次閱讀
沒有評論

共計(jì) 27378 個(gè)字符,預(yù)計(jì)需要花費(fèi) 69 分鐘才能閱讀完成。

這篇文章主要講解了“Redis 中命令的原子性是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Redis 中命令的原子性是什么”吧!

Redis 如何應(yīng)對并發(fā)訪問 Redis 中處理并發(fā)的方案

業(yè)務(wù)中有時(shí)候我們會用 Redis 處理一些高并發(fā)的業(yè)務(wù)場景,例如,秒殺業(yè)務(wù),對于庫存的操作。。。

先來分析下,并發(fā)場景下會發(fā)生什么問題

并發(fā)問題主要發(fā)生在數(shù)據(jù)的修改上,對于客戶端修改數(shù)據(jù),一般分成下面兩個(gè)步驟:

1、客戶端先把數(shù)據(jù)讀取到本地,在本地進(jìn)行修改;

2、客戶端修改完數(shù)據(jù)后,再寫回 Redis。

我們把這個(gè)流程叫做讀取 - 修改 - 寫回操作(Read-Modify-Write,簡稱為 RMW 操作)。如果客戶端并發(fā)進(jìn)行 RMW 操作的時(shí)候,就需要保證 讀取 - 修改 - 寫回是一個(gè)原子操作,進(jìn)行命令操作的時(shí)候,其他客戶端不能對當(dāng)前的數(shù)據(jù)進(jìn)行操作。

錯誤的栗子:

統(tǒng)計(jì)一個(gè)頁面的訪問次數(shù),每次刷新頁面訪問次數(shù) +1,這里使用 Redis 來記錄訪問次數(shù)。

如果每次的讀取 - 修改 - 寫回操作不是一個(gè)原子操作,那么就可能存在下圖的問題,客戶端 2 在客戶端 1 操作的中途,也獲取 Redis 的值,也對值進(jìn)行 +1,操作,這樣就導(dǎo)致最終數(shù)據(jù)的錯誤。

對于上面的這種情況,一般會有兩種方式解決:

1、使用 Redis 實(shí)現(xiàn)一把分布式鎖,通過鎖來保護(hù)每次只有一個(gè)線程來操作臨界資源;

2、實(shí)現(xiàn)操作命令的原子性。

栗如,對于上面的錯誤栗子,如果讀取 - 修改 - 寫回是一個(gè)原子性的命令,那么這個(gè)命令在操作過程中就不有別的線程同時(shí)讀取操作數(shù)據(jù),這樣就能避免上面栗子出現(xiàn)的問題。

下面從原子性和鎖兩個(gè)方面,具體分析下,對并發(fā)訪問問題的處理

原子性

為了實(shí)現(xiàn)并發(fā)控制要求的臨界區(qū)代碼互斥執(zhí)行,如果使用 Redis 中命令的原子性,可以有下面兩種處理方式:

1、借助于 Redis 中的原子性的單命令;

2、把多個(gè)操作寫到一個(gè) Lua 腳本中,以原子性方式執(zhí)行單個(gè) Lua 腳本。

在探討 Redis 原子性的時(shí)候,先來探討下 Redis 中使用到的編程模型

Redis 的編程模型

Redis 中使用到了 Reactor 模型,Reactor 是非阻塞 I/O 模型,這里來看下 Unix 中的 I/O 模型。

Unix 中的 I/O 模型

操作系統(tǒng)上的 I/O 是用戶空間和內(nèi)核空間的數(shù)據(jù)交互,因此 I/O 操作通常包含以下兩個(gè)步驟:

1、等待網(wǎng)絡(luò)數(shù)據(jù)到達(dá)網(wǎng)卡 (讀就緒)/ 等待網(wǎng)卡可寫 (寫就緒) – 讀取 / 寫入到內(nèi)核緩沖區(qū);

2、從內(nèi)核緩沖區(qū)復(fù)制數(shù)據(jù) – 用戶空間 (讀)/ 從用戶空間復(fù)制數(shù)據(jù) – 內(nèi)核緩沖區(qū) (寫);

Unix 中有五種基本的 I/O 模型

阻塞式 I/O;

非阻塞式 I/O;

I/O 多路復(fù)用;

信號驅(qū)動 I/O;

異步 I/O;

而判定一個(gè) I/O 模型是同步還是異步,主要看第二步:數(shù)據(jù)在用戶和內(nèi)核空間之間復(fù)制的時(shí)候是不是會阻塞當(dāng)前進(jìn)程,如果會,則是同步 I/O,否則,就是異步 I/O。

這里主要分下下面三種 I/O 模型

阻塞型 I/O;

當(dāng)用戶程序執(zhí)行 read,線程會被阻塞,一直等到內(nèi)核數(shù)據(jù)準(zhǔn)備好,并把數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到應(yīng)用程序的緩沖區(qū)中,當(dāng)拷貝過程完成,read 才會返回。

阻塞等待的是「內(nèi)核數(shù)據(jù)準(zhǔn)備好」和「數(shù)據(jù)從內(nèi)核態(tài)拷貝到用戶態(tài)」這兩個(gè)過程。

非阻塞同步 I/O;

非阻塞的 read 請求在數(shù)據(jù)未準(zhǔn)備好的情況下立即返回,可以繼續(xù)往下執(zhí)行,此時(shí)應(yīng)用程序不斷輪詢內(nèi)核,直到數(shù)據(jù)準(zhǔn)備好,內(nèi)核將數(shù)據(jù)拷貝到應(yīng)用程序緩沖區(qū),read 調(diào)用才可以獲取到結(jié)果。

這里最后一次 read 調(diào)用,獲取數(shù)據(jù)的過程,是一個(gè)同步的過程,是需要等待的過程。這里的同步指的是內(nèi)核態(tài)的數(shù)據(jù)拷貝到用戶程序的緩存區(qū)這個(gè)過程。

非阻塞異步 I/O;

發(fā)起異步 I/O,就立即返回,內(nèi)核自動將數(shù)據(jù)從內(nèi)核空間拷貝到用戶空間,這個(gè)拷貝過程同樣是異步的,內(nèi)核自動完成的,和前面的同步操作不一樣,應(yīng)用程序并不需要主動發(fā)起拷貝動作。

舉個(gè)你去飯?zhí)贸燥埖睦樱愫帽葢?yīng)用程序,飯?zhí)煤帽炔僮飨到y(tǒng)。

阻塞 I/O 好比,你去飯?zhí)贸燥垼秋執(zhí)玫牟诉€沒做好,然后你就一直在那里等啊等,等了好長一段時(shí)間終于等到飯?zhí)冒⒁贪巡硕肆顺鰜恚〝?shù)據(jù)準(zhǔn)備的過程),但是你還得繼續(xù)等阿姨把菜(內(nèi)核空間)打到你的飯盒里(用戶空間),經(jīng)歷完這兩個(gè)過程,你才可以離開。

非阻塞 I/O 好比,你去了飯?zhí)茫瑔柊⒁滩俗龊昧藳]有,阿姨告訴你沒,你就離開了,過幾十分鐘,你又來飯?zhí)脝柊⒁蹋⒁陶f做好了,于是阿姨幫你把菜打到你的飯盒里,這個(gè)過程你是得等待的。

異步 I/O 好比,你讓飯?zhí)冒⒁虒⒉俗龊貌巡舜虻斤埡欣锖螅扬埡兴偷侥忝媲埃麄€(gè)過程你都不需要任何等待。

在 web 服務(wù)中,處理 web 請求通常有兩種體系結(jié)構(gòu),分別為:thread-based architecture(基于線程的架構(gòu))、event-driven architecture(事件驅(qū)動模型)

thread-based architecture(基于線程的架構(gòu))

thread-based architecture(基于線程的架構(gòu)):這種比較容易理解,就是多線程并發(fā)模式,服務(wù)端在處理請求的時(shí)候,一個(gè)請求分配一個(gè)獨(dú)立的線程來處理。

因?yàn)槊總€(gè)請求分配一個(gè)獨(dú)立的線程,所以單個(gè)線程的阻塞不會影響到其他的線程,能夠提高程序的響應(yīng)速度。

不足的是,連接和線程之間始終保持一對一的關(guān)系,如果是一直處于 Keep-Alive 狀態(tài)的長連接將會導(dǎo)致大量工作線程在空閑狀態(tài)下等待,例如,文件系統(tǒng)訪問,網(wǎng)絡(luò)等。此外,成百上千的連接還可能會導(dǎo)致并發(fā)線程浪費(fèi)大量內(nèi)存的堆棧空間。

event-driven architecture(事件驅(qū)動模型)

事件驅(qū)動的體系結(jié)構(gòu)由事件生產(chǎn)者和事件消費(fèi)者組,是一種松耦合、分布式的驅(qū)動架構(gòu),生產(chǎn)者收集到某應(yīng)用產(chǎn)生的事件后實(shí)時(shí)對事件采取必要的處理后路由至下游系統(tǒng),無需等待系統(tǒng)響應(yīng),下游的事件消費(fèi)者組收到是事件消息,異步的處理。

事件驅(qū)動架構(gòu)具有以下優(yōu)勢:

降低耦合;

降低事件生產(chǎn)者和訂閱者的耦合性。事件生產(chǎn)者只需關(guān)注事件的發(fā)生,無需關(guān)注事件如何處理以及被分發(fā)給哪些訂閱者。任何一個(gè)環(huán)節(jié)出現(xiàn)故障,不會影響其他業(yè)務(wù)正常運(yùn)行。

異步執(zhí)行;

事件驅(qū)動架構(gòu)適用于異步場景,即便是需求高峰期,收集各種來源的事件后保留在事件總線中,然后逐步分發(fā)傳遞事件,不會造成系統(tǒng)擁塞或資源過剩的情況。

可擴(kuò)展性;

事件驅(qū)動架構(gòu)中路由和過濾能力支持劃分服務(wù),便于擴(kuò)展和路由分發(fā)。

Reactor 模式和 Proactor 模式都是 event-driven architecture(事件驅(qū)動模型)的實(shí)現(xiàn)方式,這里具體分析下

Reactor 模式

Reactor 模式,是指通過一個(gè)或多個(gè)輸入同時(shí)傳遞給服務(wù)處理器的服務(wù)請求的事件驅(qū)動處理模式。

在處理?絡(luò) IO 的連接事件、讀事件、寫事件。Reactor 中引入了三類角色

reactor:監(jiān)聽和分配事件,連接事件交給 acceptor 處理,讀寫事件交給 handler 處理;

acceptor:接收連接請求,接收連接后,會創(chuàng)建 handler,處理網(wǎng)絡(luò)連接上對后續(xù)讀寫事件的處理;

handler:處理讀寫事件。

Reactor 模型又分為 3 類:

單線程 Reactor 模式;

建立連接(Acceptor)、監(jiān)聽 accept、read、write 事件(Reactor)、處理事件(Handler)都只用一個(gè)單線程;

多線程 Reactor 模式;

與單線程模式不同的是,添加了一個(gè)工作者線程池,并將非 I/O 操作從 Reactor 線程中移出轉(zhuǎn)交給工作者線程池(Thread Pool)來執(zhí)行。

建立連接(Acceptor)和 監(jiān)聽 accept、read、write 事件(Reactor),復(fù)用一個(gè)線程。

工作線程池:處理事件(Handler),由一個(gè)工作線程池來執(zhí)行業(yè)務(wù)邏輯,包括數(shù)據(jù)就緒后,用戶態(tài)的數(shù)據(jù)讀寫。

主從 Reactor 模式;

對于多個(gè) CPU 的機(jī)器,為充分利用系統(tǒng)資源,將 Reactor 拆分為兩部分:mainReactor 和 subReactor。

mainReactor:負(fù)責(zé)監(jiān)聽 server socket,用來處理網(wǎng)絡(luò)新連接的建立,將建立的 socketChannel 指定注冊給 subReactor,通常一個(gè)線程就可以處理;

subReactor:監(jiān)聽 accept、read、write 事件(Reactor),包括等待數(shù)據(jù)就緒時(shí),內(nèi)核態(tài)的數(shù)據(jù)讀寫,通常使用多線程。

工作線程:處理事件(Handler)可以和 subReactor 共同使用同一個(gè)線程,也可以做成線程池,類似上面多線程 Reactor 模式下的工作線程池的處理方式。

Proactor 模式

reactor 流程與 Reactor 模式類似

不同點(diǎn)就是

Reactor 是非阻塞同步網(wǎng)絡(luò)模式,感知的是就緒可讀寫事件。

在每次感知到有事件發(fā)生(比如可讀就緒事件)后,就需要應(yīng)用進(jìn)程主動調(diào)用 read 方法來完成數(shù)據(jù)的讀取,也就是要應(yīng)用進(jìn)程主動將 socket 接收緩存中的數(shù)據(jù)讀到應(yīng)用進(jìn)程內(nèi)存中,這個(gè)過程是同步的,讀取完數(shù)據(jù)后應(yīng)用進(jìn)程才能處理數(shù)據(jù)。

Proactor 是異步網(wǎng)絡(luò)模式,感知的是已完成的讀寫事件。

在發(fā)起異步讀寫請求時(shí),需要傳入數(shù)據(jù)緩沖區(qū)的地址(用來存放結(jié)果數(shù)據(jù))等信息,這樣系統(tǒng)內(nèi)核才可以自動幫我們把數(shù)據(jù)的讀寫工作完成,這里的讀寫工作全程由操作系統(tǒng)來做,并不需要像 Reactor 那樣還需要應(yīng)用進(jìn)程主動發(fā)起 read/write 來讀寫數(shù)據(jù),操作系統(tǒng)完成讀寫工作后,就會通知應(yīng)用進(jìn)程直接處理數(shù)據(jù)。

因此,Reactor 可以理解為「來了事件操作系統(tǒng)通知應(yīng)用進(jìn)程,讓應(yīng)用進(jìn)程來處理」,而 Proactor 可以理解為「來了事件操作系統(tǒng)來處理,處理完再通知應(yīng)用進(jìn)程」。

舉個(gè)實(shí)際生活中的例子,Reactor 模式就是快遞員在樓下,給你打電話告訴你快遞到你家小區(qū)了,你需要自己下樓來拿快遞。而在 Proactor 模式下,快遞員直接將快遞送到你家門口,然后通知你。

為什么 Redis 選擇單線程

Redis 中使用是單線程,可能處于以下幾方面的考慮

1、Redis 是純內(nèi)存的操作,執(zhí)行速度是非常快的,因此這部分操作通常不會是性能瓶頸,性能瓶頸在于網(wǎng)絡(luò) I/O;

2、避免過多的上下文切換開銷,單線程則可以規(guī)避進(jìn)程內(nèi)頻繁的線程切換開銷;

3、避免同步機(jī)制的開銷,多線程必然會面臨對于共享資源的訪問,這時(shí)候通常的做法就是加鎖,雖然是多線程,這時(shí)候就會變成串行的訪問。也就是多線程編程模式會面臨的共享資源的并發(fā)訪問控制問題;

4、簡單可維護(hù),多線程也會引入同步原語來保護(hù)共享資源的并發(fā)訪問,代碼的可維護(hù)性和易讀性將會下降。

Redis 在 v6.0 版本之前,Redis 的核心網(wǎng)絡(luò)模型一直是一個(gè)典型的單 Reactor 模型:利用 epoll/select/kqueue 等多路復(fù)用技術(shù),在單線程的事件循環(huán)中不斷去處理事件(客戶端請求),最后回寫響應(yīng)數(shù)據(jù)到客戶端:

這里來看下 Redis 如何使用單線程處理任務(wù)

Redis 中命令的原子性是什么

事件驅(qū)動框架對事件的捕獲分發(fā)

Redis 的網(wǎng)絡(luò)框架實(shí)現(xiàn)了 Reactor 模型,并且自行開發(fā)實(shí)現(xiàn)了一個(gè)事件驅(qū)動框架。

事件驅(qū)動框架的邏輯簡單點(diǎn)講就是

事件初始化;

事件捕獲;

分發(fā)和處理主循環(huán)。

Redis 中命令的原子性是什么

來看下 Redis 中事件驅(qū)動框架實(shí)現(xiàn)的幾個(gè)主要函數(shù)

//  執(zhí)行事件捕獲,分發(fā)和處理循環(huán)
void aeMain(aeEventLoop *eventLoop);
//  用來注冊監(jiān)聽的事件和事件對應(yīng)的處理函數(shù)。只有對事件和處理函數(shù)進(jìn)行了注冊,才能在事件發(fā)生時(shí)調(diào)用相應(yīng)的函數(shù)進(jìn)行處理。int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData);
// aeProcessEvents  函數(shù)實(shí)現(xiàn)的主要功能,包括捕獲事件、判斷事件類型和調(diào)用具體的事件處理函數(shù),從而實(shí)現(xiàn)事件的處理
int aeProcessEvents(aeEventLoop *eventLoop, int flags);

使用 aeMain 作為主循環(huán)來對事件進(jìn)行持續(xù)監(jiān)聽和捕獲,其中會調(diào)用 aeProcessEvents 函數(shù),實(shí)現(xiàn)事件捕獲、判斷事件類型和調(diào)用具體的事件處理函數(shù),從而實(shí)現(xiàn)事件的處理。

// https://github.com/redis/redis/blob/5.0/src/ae.c#L496
void aeMain(aeEventLoop *eventLoop) {
 eventLoop- stop = 0;
 while (!eventLoop- stop) { if (eventLoop- beforesleep != NULL)
 eventLoop- beforesleep(eventLoop);
 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
 }
// https://github.com/redis/redis/blob/5.0/src/ae.c#L358
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
 ...
 if (eventLoop- maxfd != -1 || ((flags   AE_TIME_EVENTS)   !(flags   AE_DONT_WAIT))) {
 ...
 // 調(diào)用 aeApiPoll 函數(shù)捕獲事件
 numevents = aeApiPoll(eventLoop, tvp);
 ...
 }
 ...
}

可以看到 aeProcessEvents 中對于 IO 事件的捕獲是通過調(diào)用 aeApiPoll 來完成的。

aeApiPoll 是 I/O 多路復(fù)用 API,是基于 epoll_wait/select/kevent 等系統(tǒng)調(diào)用的封裝,監(jiān)聽等待讀寫事件觸發(fā),然后處理,它是事件循環(huán)(Event Loop)中的核心函數(shù),是事件驅(qū)動得以運(yùn)行的基礎(chǔ)。

Redis 是依賴于操作系統(tǒng)底層提供的 IO 多路復(fù)用機(jī)制,來實(shí)現(xiàn)事件捕獲,檢查是否有新的連接、讀寫事件發(fā)生。為了適配不同的操作系統(tǒng),Redis 對不同操作系統(tǒng)實(shí)現(xiàn)的網(wǎng)絡(luò) IO 多路復(fù)用函數(shù),都進(jìn)行了統(tǒng)一的封裝。

// https://github.com/redis/redis/blob/5.0/src/ae.c#L49
#ifdef HAVE_EVPORT
#include  ae_evport.c  // Solaris
#else
 #ifdef HAVE_EPOLL
 #include  ae_epoll.c  // Linux
 #else
 #ifdef HAVE_KQUEUE
 #include  ae_kqueue.c  // MacOS
 #else
 #include  ae_select.c  // Windows
 #endif
 #endif
#endif

ae_epoll.c:對應(yīng) Linux 上的 IO 復(fù)用函數(shù) epoll;

ae_evport.c:對應(yīng) Solaris 上的 IO 復(fù)用函數(shù) evport;

ae_kqueue.c:對應(yīng) macOS 或 FreeBSD 上的 IO 復(fù)用函數(shù) kqueue;

ae_select.c:對應(yīng) Linux(或 Windows)的 IO 復(fù)用函數(shù) select。

客戶端連接應(yīng)答

監(jiān)聽 socket 的讀事件, 當(dāng)有客戶端連接請求過來,使用函數(shù) acceptTcpHandler 和客戶端建立連接

當(dāng) Redis 啟動后,服務(wù)器程序的 main 函數(shù)會調(diào)用 initSever 函數(shù)來進(jìn)行初始化,而在初始化的過程中,aeCreateFileEvent 就會被 initServer 函數(shù)調(diào)用,用于注冊要監(jiān)聽的事件,以及相應(yīng)的事件處理函數(shù)。

// https://github.com/redis/redis/blob/5.0/src/server.c#L2036
void initServer(void) {
 ...
 //  創(chuàng)建一個(gè)事件處理程序以接受  TCP  和  Unix  中的新連接
 for (j = 0; j   server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
 acceptTcpHandler,NULL) == AE_ERR)
 {
 serverPanic(
  Unrecoverable error creating server.ipfd file event. 
 }
 }
 ...
}

可以看到 initServer 中會根據(jù)啟用的 IP 端口個(gè)數(shù),為每個(gè) IP 端口上的網(wǎng)絡(luò)事件,調(diào)用 aeCreateFileEvent,創(chuàng)建對 AE_READABLE 事件的監(jiān)聽,并且注冊 AE_READABLE 事件的處理 handler,也就是 acceptTcpHandler 函數(shù)。

然后看下 acceptTcpHandler 的實(shí)現(xiàn)

// https://github.com/redis/redis/blob/5.0/src/networking.c#L734
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
 char cip[NET_IP_STR_LEN];
 UNUSED(el);
 UNUSED(mask);
 UNUSED(privdata);
 while(max--) {
 //  用于 accept 客戶端的連接,其返回值是客戶端對應(yīng)的 socket
 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip),  cport);
 if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK)
 serverLog(LL_WARNING,
  Accepting client connection: %s , server.neterr);
 return;
 }
 serverLog(LL_VERBOSE, Accepted %s:%d , cip, cport);
 //  會調(diào)用 acceptCommonHandler 對連接以及客戶端進(jìn)行初始化
 acceptCommonHandler(cfd,0,cip);
 }
// https://github.com/redis/redis/blob/5.0/src/networking.c#L664 
static void acceptCommonHandler(int fd, int flags, char *ip) {
 client *c;
 //  分配并初始化新客戶端
 if ((c = createClient(fd)) == NULL) {
 serverLog(LL_WARNING,
  Error registering fd event for the new client: %s (fd=%d) ,
 strerror(errno),fd);
 close(fd); /* May be already closed, just ignore errors */
 return;
 }
 //  判斷當(dāng)前連接的客戶端是否超過最大值,如果超過的話,會拒絕這次連接。否則,更新客戶端連接數(shù)的計(jì)數(shù)
 if (listLength(server.clients)   server.maxclients) {
 char *err =  -ERR max number of clients reached\r\n 
 /* That s a best effort error message, don t check write errors */
 if (write(c- fd,err,strlen(err)) == -1) {
 /* Nothing to do, Just to avoid the warning... */
 }
 server.stat_rejected_conn++;
 freeClient(c);
 return;
 }
 ...
//  使用多路復(fù)用,需要記錄每個(gè)客戶端的狀態(tài),client  之前通過鏈表保存
typedef struct client {
int fd; //  字段是客戶端套接字文件描述符
sds querybuf; //  保存客戶端發(fā)來命令請求的輸入緩沖區(qū)。以 Redis 通信協(xié)議的方式保存
int argc; //  當(dāng)前命令的參數(shù)數(shù)量
robj **argv; //  當(dāng)前命令的參數(shù)
redisDb *db; //  當(dāng)前選擇的數(shù)據(jù)庫指針
int flags;
list *reply; //  保存命令回復(fù)的鏈表。因?yàn)殪o態(tài)緩沖區(qū)大小固定,主要保存固定長度的命令回復(fù),當(dāng)處理一些返回大量回復(fù)的命令,則會將命令回復(fù)以鏈表的形式連接起來。// ... many other fields ...
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
client *createClient(int fd) { client *c = zmalloc(sizeof(client));
 
 //  如果 fd 為 -1,表示創(chuàng)建的是一個(gè)無網(wǎng)絡(luò)連接的偽客戶端,用于執(zhí)行 lua 腳本的時(shí)候。 //  如果 fd 不等于 -1,表示創(chuàng)建一個(gè)有網(wǎng)絡(luò)連接的客戶端
 if (fd != -1) {
 //  設(shè)置 fd 為非阻塞模式
 anetNonBlock(NULL,fd);
 //  禁止使用  Nagle  算法,client 向內(nèi)核遞交的每個(gè)數(shù)據(jù)包都會立即發(fā)送給 server 出去,TCP_NODELAY
 anetEnableTcpNoDelay(NULL,fd);
 //  如果開啟了 tcpkeepalive,則設(shè)置  SO_KEEPALIVE
 if (server.tcpkeepalive)
 anetKeepAlive(NULL,fd,server.tcpkeepalive);
 //  創(chuàng)建一個(gè)文件事件狀態(tài) el,且監(jiān)聽讀事件,開始接受命令的輸入
 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
 readQueryFromClient, c) == AE_ERR)
 { close(fd);
 zfree(c);
 return NULL;
 }
 }
 ...
 //  初始化 client  中的參數(shù)
 return c;
}

1、acceptTcpHandler 主要用于處理和客戶端連接的建立;

2、其中會調(diào)用函數(shù) anetTcpAccept 用于 accept 客戶端的連接,其返回值是客戶端對應(yīng)的 socket;

3、然后調(diào)用 acceptCommonHandler 對連接以及客戶端進(jìn)行初始化;

4、初始化客戶端的時(shí)候,同時(shí)使用 aeCreateFileEvent 用來注冊監(jiān)聽的事件和事件對應(yīng)的處理函數(shù),將 readQueryFromClient 命令讀取處理器綁定到新連接對應(yīng)的文件描述符上;

5、服務(wù)器會監(jiān)聽該文件描述符的讀事件,當(dāng)客戶端發(fā)送了命令,觸發(fā)了 AE_READABLE 事件,那么就會調(diào)用回調(diào)函數(shù) readQueryFromClient() 來從文件描述符 fd 中讀發(fā)來的命令,并保存在輸入緩沖區(qū)中 querybuf。

命令的接收

readQueryFromClient 是請求處理的起點(diǎn), 解析并執(zhí)行客戶端的請求命令。

// https://github.com/redis/redis/blob/5.0/src/networking.c#L1522
//  讀取 client 的輸入緩沖區(qū)的內(nèi)容
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata;
 int nread, readlen;
 size_t qblen;
 UNUSED(el);
 UNUSED(mask);
 ...
 //  輸入緩沖區(qū)的長度
 qblen = sdslen(c- querybuf);
 //  更新緩沖區(qū)的峰值
 if (c- querybuf_peak   qblen) c- querybuf_peak = qblen;
 //  擴(kuò)展緩沖區(qū)的大小
 c- querybuf = sdsMakeRoomFor(c- querybuf, readlen);
 //  調(diào)用 read 從描述符為 fd 的客戶端 socket 中讀取數(shù)據(jù)
 nread = read(fd, c- querybuf+qblen, readlen);
 ...
 //  處理讀取的內(nèi)容
 processInputBufferAndReplicate(c);
// https://github.com/redis/redis/blob/5.0/src/networking.c#L1507
void processInputBufferAndReplicate(client *c) {
 //  當(dāng)前客戶端不屬于主從復(fù)制中的 Master 
 //  直接調(diào)用  processInputBuffer,對客戶端輸入緩沖區(qū)中的命令和參數(shù)進(jìn)行解析
 if (!(c- flags   CLIENT_MASTER)) { processInputBuffer(c);
 //  客戶端屬于主從復(fù)制中的 Master 
 //  調(diào)用 processInputBuffer 函數(shù),解析客戶端命令, 
 //  調(diào)用 replicationFeedSlavesFromMasterStream  函數(shù),將主節(jié)點(diǎn)接收到的命令同步給從節(jié)點(diǎn)
 } else {
 size_t prev_offset = c- reploff;
 processInputBuffer(c);
 size_t applied = c- reploff - prev_offset;
 if (applied) {
 replicationFeedSlavesFromMasterStream(server.slaves,
 c- pending_querybuf, applied);
 sdsrange(c- pending_querybuf,applied,-1);
 }
 }
// https://github.com/redis/redis/blob/5.0/src/networking.c#L1428
void processInputBuffer(client *c) {
 server.current_client = c;
 /* Keep processing while there is something in the input buffer */
 //  持續(xù)讀取緩沖區(qū)的內(nèi)容
 while(c- qb_pos   sdslen(c- querybuf)) {
 ...
 /* Multibulk processing could see a  = 0 length. */
 //  如果參數(shù)為 0,則重置 client
 if (c- argc == 0) { resetClient(c);
 } else {
 /* Only reset the client when the command was executed. */
 //  執(zhí)行命令成功后重置 client
 if (processCommand(c) == C_OK) { if (c- flags   CLIENT_MASTER   !(c- flags   CLIENT_MULTI)) {
 /* Update the applied replication offset of our master. */
 c- reploff = c- read_reploff - sdslen(c- querybuf) + c- qb_pos;
 }
 //  命令處于阻塞狀態(tài)中的客戶端,不需要進(jìn)行重置
 if (!(c- flags   CLIENT_BLOCKED) || c- btype != BLOCKED_MODULE)
 resetClient(c);
 }
 /* freeMemoryIfNeeded may flush slave output buffers. This may
 * result into a slave, that may be the active client, to be
 * freed. */
 if (server.current_client == NULL) break;
 }
 }
 /* Trim to pos */
 if (server.current_client != NULL   c- qb_pos) { sdsrange(c- querybuf,c- qb_pos,-1);
 c- qb_pos = 0;
 }
 server.current_client = NULL;
}

1、readQueryFromClient(),從文件描述符 fd 中讀出數(shù)據(jù)到輸入緩沖區(qū) querybuf 中;

2、使用 processInputBuffer 函數(shù)完成對命令的解析,在其中使用 processInlineBuffer 或者 processMultibulkBuffer 根據(jù) Redis 協(xié)議解析命令;

3、完成對一個(gè)命令的解析,就使用 processCommand 對命令就行執(zhí)行;

4、命令執(zhí)行完成,最后調(diào)用 addReply 函數(shù)族的一系列函數(shù)將響應(yīng)數(shù)據(jù)寫入到對應(yīng) client 的寫出緩沖區(qū):client- buf 或者 client- reply,client- buf 是首選的寫出緩沖區(qū),固定大小 16KB,一般來說可以緩沖足夠多的響應(yīng)數(shù)據(jù),但是如果客戶端在時(shí)間窗口內(nèi)需要響應(yīng)的數(shù)據(jù)非常大,那么則會自動切換到 client- reply 鏈表上去,使用鏈表理論上能夠保存無限大的數(shù)據(jù)(受限于機(jī)器的物理內(nèi)存),最后把 client 添加進(jìn)一個(gè) LIFO 隊(duì)列 clients_pending_write;

命令的回復(fù)

在 Redis 事件驅(qū)動框架每次循環(huán)進(jìn)入事件處理函數(shù)前,來處理監(jiān)聽到的已觸發(fā)事件或是到時(shí)的時(shí)間事件之前,都會調(diào)用 beforeSleep 函數(shù),進(jìn)行一些任務(wù)處理,這其中就包括了調(diào)用 handleClientsWithPendingWrites 函數(shù),它會將 Redis sever 客戶端緩沖區(qū)中的數(shù)據(jù)寫回客戶端。

// https://github.com/redis/redis/blob/5.0/src/server.c#L1380
void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop);
 ...
 //  將  Redis sever  客戶端緩沖區(qū)中的數(shù)據(jù)寫回客戶端
 handleClientsWithPendingWrites();
 ...
// https://github.com/redis/redis/blob/5.0/src/networking.c#L1082
int handleClientsWithPendingWrites(void) {
 listIter li;
 listNode *ln;
 //  遍歷  clients_pending_write  隊(duì)列,調(diào)用  writeToClient  把  client  的寫出緩沖區(qū)里的數(shù)據(jù)回寫到客戶端
 int processed = listLength(server.clients_pending_write);
 listRewind(server.clients_pending_write, li);
 while((ln = listNext( li))) { client *c = listNodeValue(ln);
 c- flags  = ~CLIENT_PENDING_WRITE;
 listDelNode(server.clients_pending_write,ln);
 ...
 //  調(diào)用  writeToClient  函數(shù),將客戶端輸出緩沖區(qū)中的數(shù)據(jù)寫回
 if (writeToClient(c- fd,c,0) == C_ERR) continue;
 //  如果輸出緩沖區(qū)的數(shù)據(jù)還沒有寫完,此時(shí),handleClientsWithPendingWrites  函數(shù)就
 //  會調(diào)用  aeCreateFileEvent  函數(shù),創(chuàng)建可寫事件,并設(shè)置回調(diào)函數(shù)  sendReplyToClien
 if (clientHasPendingReplies(c)) {
 int ae_flags = AE_WRITABLE;
 if (server.aof_state == AOF_ON  
 server.aof_fsync == AOF_FSYNC_ALWAYS)
 {
 ae_flags |= AE_BARRIER;
 }
 //  將文件描述符 fd 和 AE_WRITABLE 事件關(guān)聯(lián)起來,當(dāng)客戶端可寫時(shí),就會觸發(fā)事件,調(diào)用 sendReplyToClient() 函數(shù),執(zhí)行寫事件
 if (aeCreateFileEvent(server.el, c- fd, ae_flags,
 sendReplyToClient, c) == AE_ERR)
 { freeClientAsync(c);
 }
 }
 }
 return processed;
// https://github.com/redis/redis/blob/5.0/src/networking.c#L1072
//  寫事件處理程序,只是發(fā)送回復(fù)給 client
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(el);
 UNUSED(mask);
 writeToClient(fd,privdata,1);
// https://github.com/redis/redis/blob/5.0/src/networking.c#L979
//  將輸出緩沖區(qū)的數(shù)據(jù)寫給 client,如果 client 被釋放則返回 C_ERR,沒被釋放則返回 C_OK
int writeToClient(int fd, client *c, int handler_installed) {
 ssize_t nwritten = 0, totwritten = 0;
 size_t objlen;
 clientReplyBlock *o;
 
 //  如果指定的 client 的回復(fù)緩沖區(qū)中還有數(shù)據(jù),則返回真,表示可以寫 socket
 while(clientHasPendingReplies(c)) {
 //  固定緩沖區(qū)發(fā)送未完成
 if (c- bufpos   0) {
 //  將緩沖區(qū)的數(shù)據(jù)寫到 fd 中
 nwritten = write(fd,c- buf+c- sentlen,c- bufpos-c- sentlen);
 ...
 //  如果發(fā)送的數(shù)據(jù)等于 buf 的偏移量,表示發(fā)送完成
 if ((int)c- sentlen == c- bufpos) {
 c- bufpos = 0;
 c- sentlen = 0;
 }
 //  固定緩沖區(qū)發(fā)送完成,發(fā)送回復(fù)鏈表的內(nèi)容
 } else {
 //  回復(fù)鏈表的第一條回復(fù)對象,和對象值的長度和所占的內(nèi)存
 o = listNodeValue(listFirst(c- reply));
 objlen = o- used;
 if (objlen == 0) {
 c- reply_bytes -= o- size;
 listDelNode(c- reply,listFirst(c- reply));
 continue;
 }
 //  將當(dāng)前節(jié)點(diǎn)的值寫到 fd 中
 nwritten = write(fd, o- buf + c- sentlen, objlen - c- sentlen);
 if (nwritten  = 0) break;
 c- sentlen += nwritten;
 totwritten += nwritten;
 ...
 }
 ...
 }
 ...
 //  如果指定的 client 的回復(fù)緩沖區(qū)中已經(jīng)沒有數(shù)據(jù),發(fā)送完成
 if (!clientHasPendingReplies(c)) {
 c- sentlen = 0;
 //  刪除當(dāng)前 client 的可讀事件的監(jiān)聽
 if (handler_installed) aeDeleteFileEvent(server.el,c- fd,AE_WRITABLE);
 /* Close connection after entire reply has been sent. */
 //  如果指定了寫入按成之后立即關(guān)閉的標(biāo)志,則釋放 client
 if (c- flags   CLIENT_CLOSE_AFTER_REPLY) { freeClient(c);
 return C_ERR;
 }
 }
 return C_OK;
}

1、beforeSleep 函數(shù)調(diào)用的 handleClientsWithPendingWrites 函數(shù),會遍歷 clients_pending_write(待寫回?cái)?shù)據(jù)的客戶端) 隊(duì)列,調(diào)用 writeToClient 把 client 的寫出緩沖區(qū)里的數(shù)據(jù)回寫到客戶端,然后調(diào)用 writeToClient 函數(shù),將客戶端輸出緩沖區(qū)中的數(shù)據(jù)發(fā)送給客戶端;

2、如果輸出緩沖區(qū)的數(shù)據(jù)還沒有寫完,此時(shí),handleClientsWithPendingWrites 函數(shù)就會調(diào)用 aeCreateFileEvent 函數(shù),注冊 sendReplyToClient 到該連接的寫就緒事件,等待將后續(xù)將數(shù)據(jù)寫回給客戶端。

上面的執(zhí)行流程總結(jié)下來就是

1、Redis Server 啟動后,主線程會啟動一個(gè)時(shí)間循環(huán) (Event Loop), 持續(xù)監(jiān)聽事件;

2、client 到 server 的新連接,會調(diào)用 acceptTcpHandler 函數(shù),之后會注冊讀事件 readQueryFromClient 函數(shù),client 發(fā)給 server 的數(shù)據(jù),都會在這個(gè)函數(shù)處理,這個(gè)函數(shù)會解析 client 的數(shù)據(jù),找到對應(yīng)的 cmd 函數(shù)執(zhí)行;

3、cmd 邏輯執(zhí)行完成后,server 需要寫回?cái)?shù)據(jù)給 client,調(diào)用 addReply 函數(shù)族的一系列函數(shù)將響應(yīng)數(shù)據(jù)寫入到對應(yīng) client 的寫出緩沖區(qū):client- buf 或者 client- reply,client- buf 是首選的寫出緩沖區(qū),固定大小 16KB,一般來說可以緩沖足夠多的響應(yīng)數(shù)據(jù),但是如果客戶端在時(shí)間窗口內(nèi)需要響應(yīng)的數(shù)據(jù)非常大,那么則會自動切換到 client- reply 鏈表上去,使用鏈表理論上能夠保存無限大的數(shù)據(jù)(受限于機(jī)器的物理內(nèi)存),最后把 client 添加進(jìn)一個(gè) LIFO 隊(duì)列 clients_pending_write;

4、在 Redis 事件驅(qū)動框架每次循環(huán)進(jìn)入事件處理函數(shù)前,來處理監(jiān)聽到的已觸發(fā)事件或是到時(shí)的時(shí)間事件之前,都會調(diào)用 beforeSleep 函數(shù),進(jìn)行一些任務(wù)處理,這其中就包括了調(diào)用 handleClientsWithPendingWrites 函數(shù),它會將 Redis sever 客戶端緩沖區(qū)中的數(shù)據(jù)寫回客戶端;

beforeSleep 函數(shù)調(diào)用的 handleClientsWithPendingWrites 函數(shù),會遍歷 clients_pending_write(待寫回?cái)?shù)據(jù)的客戶端) 隊(duì)列,調(diào)用 writeToClient 把 client 的寫出緩沖區(qū)里的數(shù)據(jù)回寫到客戶端,然后調(diào)用 writeToClient 函數(shù),將客戶端輸出緩沖區(qū)中的數(shù)據(jù)發(fā)送給客戶端;

如果輸出緩沖區(qū)的數(shù)據(jù)還沒有寫完,此時(shí),handleClientsWithPendingWrites 函數(shù)就會調(diào)用 aeCreateFileEvent 函數(shù),注冊 sendReplyToClient 到該連接的寫就緒事件,等待將后續(xù)將數(shù)據(jù)寫回給客戶端。

Redis 多 IO 線程

在 Redis6.0 的版本中,引入了多線程來處理 IO 任務(wù),多線程的引入,充分利用了當(dāng)前服務(wù)器多核特性,使用多核運(yùn)行多線程,讓多線程幫助加速數(shù)據(jù)讀取、命令解析以及數(shù)據(jù)寫回的速度,提升 Redis 整體性能。

Redis6.0 之前的版本用的是單線程 Reactor 模式,所有的操作都在一個(gè)線程中完成,6.0 之后的版本使用了主從 Reactor 模式。

由一個(gè) mainReactor 線程接收連接,然后發(fā)送給多個(gè) subReactor 線程處理,subReactor 負(fù)責(zé)處理具體的業(yè)務(wù)。

來看下 Redis 多 IO 線程的具體實(shí)現(xiàn)過程

Redis 中命令的原子性是什么

多 IO 線程的初始化

使用 initThreadedIO 函數(shù)來初始化多 IO 線程。

// https://github.com/redis/redis/blob/6.2/src/networking.c#L3573
void initThreadedIO(void) {
 server.io_threads_active = 0; /* We start with threads not active. */
 /* Don t spawn any thread if the user selected a single thread:
 * we ll handle I/O directly from the main thread. */
 //  如果用戶只配置了一個(gè)  I/O  線程,不需要創(chuàng)建新線程了,直接在主線程中處理
 if (server.io_threads_num == 1) return;
 if (server.io_threads_num   IO_THREADS_MAX_NUM) {
 serverLog(LL_WARNING, Fatal: too many I/O threads configured.  
  The maximum number is %d. , IO_THREADS_MAX_NUM);
 exit(1);
 }
 /* Spawn and initialize the I/O threads. */
 //  初始化線程
 for (int i = 0; i   server.io_threads_num; i++) {
 /* Things we do for all the threads including the main thread. */
 io_threads_list[i] = listCreate();
 //  編號為 0 是主線程
 if (i == 0) continue; /* Thread 0 is the main thread. */
 /* Things we do only for the additional threads. */
 pthread_t tid;
 //  初始化 io_threads_mutex 數(shù)組
 pthread_mutex_init(io_threads_mutex[i],NULL);
 //  初始化 io_threads_pending 數(shù)組
 setIOPendingCount(i, 0);
 //  主線程在啟動  I/O  線程的時(shí)候會默認(rèn)先鎖住它,直到有  I/O  任務(wù)才喚醒它。 pthread_mutex_lock(io_threads_mutex[i]); /* Thread will be stopped. */
 //  調(diào)用 pthread_create 函數(shù)創(chuàng)建 IO 線程,線程運(yùn)行函數(shù)為 IOThreadMain
 if (pthread_create( tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
 serverLog(LL_WARNING, Fatal: Can t initialize IO thread. 
 exit(1);
 }
 io_threads[i] = tid;
 }
}

可以看到在 initThreadedIO 中完成了對下面四個(gè)數(shù)組的初始化工作

io_threads_list 數(shù)組:保存了每個(gè) IO 線程要處理的客戶端,將數(shù)組每個(gè)元素初始化為一個(gè) List 類型的列表;

io_threads_pending 數(shù)組:保存等待每個(gè) IO 線程處理的客戶端個(gè)數(shù);

io_threads_mutex 數(shù)組:保存線程互斥鎖;

io_threads 數(shù)組:保存每個(gè) IO 線程的描述符。

命令的接收

Redis server 在和一個(gè)客戶端建立連接后,就開始了監(jiān)聽客戶端的可讀事件,處理可讀事件的回調(diào)函數(shù)就是 readQueryFromClient

// https://github.com/redis/redis/blob/6.2/src/networking.c#L2219
void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn);
 int nread, readlen;
 size_t qblen;
 /* Check if we want to read from the client later when exiting from
 * the event loop. This is the case if threaded I/O is enabled. */
 //  判斷是否從客戶端延遲讀取數(shù)據(jù)
 if (postponeClientRead(c)) return;
 ...
// https://github.com/redis/redis/blob/6.2/src/networking.c#L3746
int postponeClientRead(client *c) {
 //  當(dāng)多線程  I/O  模式開啟、主線程沒有在處理阻塞任務(wù)時(shí),將  client  加入異步隊(duì)列。 if (server.io_threads_active  
 server.io_threads_do_reads  
 !ProcessingEventsWhileBlocked  
 !(c- flags   (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) 
 {
 //  給客戶端的 flag 添加 CLIENT_PENDING_READ 標(biāo)記,表示推遲該客戶端的讀操作
 c- flags |= CLIENT_PENDING_READ;
 //  將可獲得加入 clients_pending_write 列表
 listAddNodeHead(server.clients_pending_read,c);
 return 1;
 } else {
 return 0;
 }
}

使用 clients_pending_read 保存了需要進(jìn)行延遲讀操作的客戶端之后,這些客戶端又是如何分配給多 IO 線程執(zhí)行的呢?

handleClientsWithPendingWritesUsingThreads 函數(shù):該函數(shù)主要負(fù)責(zé)將 clients_pending_write 列表中的客戶端分配給 IO 線程進(jìn)行處理。

看下如何實(shí)現(xiàn)

// https://github.com/redis/redis/blob/6.2/src/networking.c#L3766
int handleClientsWithPendingReadsUsingThreads(void) {
 //  當(dāng)多線程  I/O  模式開啟, 才能執(zhí)行下面的流程
 if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
 int processed = listLength(server.clients_pending_read);
 if (processed == 0) return 0;
 //  遍歷待讀取的  client  隊(duì)列  clients_pending_read, //  根據(jù) IO 線程的數(shù)量,讓 clients_pending_read 中客戶端數(shù)量對 IO 線程進(jìn)行取模運(yùn)算
 //  取模的結(jié)果就是客戶端分配給對應(yīng) IO 線程的編號
 listIter li;
 listNode *ln;
 listRewind(server.clients_pending_read, li);
 int item_id = 0;
 while((ln = listNext( li))) { client *c = listNodeValue(ln);
 int target_id = item_id % server.io_threads_num;
 listAddNodeTail(io_threads_list[target_id],c);
 item_id++;
 }
 //  設(shè)置當(dāng)前  I/O  操作為讀取操作,給每個(gè)  I/O  線程的計(jì)數(shù)器設(shè)置分配的任務(wù)數(shù)量, //  讓  I/O  線程可以開始工作:只讀取和解析命令,不執(zhí)行
 io_threads_op = IO_THREADS_OP_READ;
 for (int j = 1; j   server.io_threads_num; j++) { int count = listLength(io_threads_list[j]);
 setIOPendingCount(j, count);
 }
 //  主線程自己也會去執(zhí)行讀取客戶端請求命令的任務(wù),以達(dá)到最大限度利用  CPU。 listRewind(io_threads_list[0], li);
 while((ln = listNext( li))) { client *c = listNodeValue(ln);
 readQueryFromClient(c- conn);
 }
 listEmpty(io_threads_list[0]);
 //  忙輪詢,等待所有  IO  線程完成待讀客戶端的處理
 while(1) {
 unsigned long pending = 0;
 for (int j = 1; j   server.io_threads_num; j++)
 pending += getIOPendingCount(j);
 if (pending == 0) break;
 }
 //  遍歷待讀取的  client  隊(duì)列,清除  CLIENT_PENDING_READ 標(biāo)記, //  然后解析并執(zhí)行所有  client  的命令。 while(listLength(server.clients_pending_read)) { ln = listFirst(server.clients_pending_read);
 client *c = listNodeValue(ln);
 c- flags  = ~CLIENT_PENDING_READ;
 listDelNode(server.clients_pending_read,ln);
 serverAssert(!(c- flags   CLIENT_BLOCKED));
 // client  的第一條命令已經(jīng)被解析好了,直接嘗試執(zhí)行。 if (processPendingCommandsAndResetClient(c) == C_ERR) {
 /* If the client is no longer valid, we avoid
 * processing the client later. So we just go
 * to the next. */
 continue;
 }
 //  解析并執(zhí)行  client  命令
 processInputBuffer(c);
 //  命令執(zhí)行完成之后,如果  client  中有響應(yīng)數(shù)據(jù)需要回寫到客戶端,則將  client  加入到待寫出隊(duì)列  clients_pending_write
 if (!(c- flags   CLIENT_PENDING_WRITE)   clientHasPendingReplies(c))
 clientInstallWriteHandler(c);
 }
 /* Update processed count on server */
 server.stat_io_reads_processed += processed;
 return processed;
}

1、當(dāng)客戶端發(fā)送命令請求之后,會觸發(fā) Redis 主線程的事件循環(huán),命令處理器 readQueryFromClient 被回調(diào),多線程模式下,則會把 client 加入到 clients_pending_read 任務(wù)隊(duì)列中去,后面主線程再分配到 I/O 線程去讀取客戶端請求命令;

2、主線程會根據(jù) clients_pending_read 中客戶端數(shù)量對 IO 線程進(jìn)行取模運(yùn)算,取模的結(jié)果就是客戶端分配給對應(yīng) IO 線程的編號;

3、忙輪詢,等待所有的線程完成讀取客戶端命令的操作,這一步用到了多線程的請求;

4、遍歷 clients_pending_read,執(zhí)行所有 client 的命令,這里就是在主線程中執(zhí)行的,命令的執(zhí)行是單線程的操作。

命令的回復(fù)

完成命令的讀取、解析以及執(zhí)行之后,客戶端命令的響應(yīng)數(shù)據(jù)已經(jīng)存入 client- buf 或者 client- reply 中。

主循環(huán)在捕獲 IO 事件的時(shí)候,beforeSleep 函數(shù)會被調(diào)用,進(jìn)而調(diào)用 handleClientsWithPendingWritesUsingThreads,寫回響應(yīng)數(shù)據(jù)給客戶端。

// https://github.com/redis/redis/blob/6.2/src/networking.c#L3662
int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write);
 if (processed == 0) return 0; /* Return ASAP if there are no clients. */
 //  如果用戶設(shè)置的  I/O  線程數(shù)等于  1  或者當(dāng)前  clients_pending_write  隊(duì)列中待寫出的  client
 //  數(shù)量不足  I/O  線程數(shù)的兩倍,則不用多線程的邏輯,讓所有  I/O  線程進(jìn)入休眠, //  直接在主線程把所有  client  的相應(yīng)數(shù)據(jù)回寫到客戶端。 if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { return handleClientsWithPendingWrites();
 }
 //  喚醒正在休眠的  I/O  線程(如果有的話)。 if (!server.io_threads_active) startThreadedIO();
 /* Distribute the clients across N different lists. */
 //  和上面的 handleClientsWithPendingReadsUsingThreads 中的操作一樣分配客戶端給 IO 線程
 listIter li;
 listNode *ln;
 listRewind(server.clients_pending_write, li);
 int item_id = 0;
 while((ln = listNext( li))) { client *c = listNodeValue(ln);
 c- flags  = ~CLIENT_PENDING_WRITE;
 /* Remove clients from the list of pending writes since
 * they are going to be closed ASAP. */
 if (c- flags   CLIENT_CLOSE_ASAP) { listDelNode(server.clients_pending_write, ln);
 continue;
 }
 int target_id = item_id % server.io_threads_num;
 listAddNodeTail(io_threads_list[target_id],c);
 item_id++;
 }
 //  設(shè)置當(dāng)前  I/O  操作為寫出操作,給每個(gè)  I/O  線程的計(jì)數(shù)器設(shè)置分配的任務(wù)數(shù)量, //  讓  I/O  線程可以開始工作,把寫出緩沖區(qū)(client- buf  或  c- reply)中的響應(yīng)數(shù)據(jù)回寫到客戶端。 //  可以看到寫回操作也是多線程執(zhí)行的
 io_threads_op = IO_THREADS_OP_WRITE;
 for (int j = 1; j   server.io_threads_num; j++) { int count = listLength(io_threads_list[j]);
 setIOPendingCount(j, count);
 }
 //  主線程自己也會去執(zhí)行讀取客戶端請求命令的任務(wù),以達(dá)到最大限度利用  CPU。 listRewind(io_threads_list[0], li);
 while((ln = listNext( li))) { client *c = listNodeValue(ln);
 writeToClient(c,0);
 }
 listEmpty(io_threads_list[0]);
 /* Wait for all the other threads to end their work. */
 //  等待所有的線程完成對應(yīng)的工作
 while(1) {
 unsigned long pending = 0;
 for (int j = 1; j   server.io_threads_num; j++)
 pending += getIOPendingCount(j);
 if (pending == 0) break;
 }
 //  最后再遍歷一次  clients_pending_write  隊(duì)列,檢查是否還有  client  的寫出緩沖區(qū)中有殘留數(shù)據(jù), //  如果有,那就為  client  注冊一個(gè)命令回復(fù)器  sendReplyToClient,等待客戶端寫就緒再繼續(xù)把數(shù)據(jù)回寫。 listRewind(server.clients_pending_write, li);
 while((ln = listNext( li))) { client *c = listNodeValue(ln);
 //  檢查  client  的寫出緩沖區(qū)是否還有遺留數(shù)據(jù)。 if (clientHasPendingReplies(c)  
 connSetWriteHandler(c- conn, sendReplyToClient) == AE_ERR)
 { freeClientAsync(c);
 }
 }
 listEmpty(server.clients_pending_write);
 /* Update processed count on server */
 server.stat_io_writes_processed += processed;
 return processed;
}

1、也是會將 client 分配給所有的 IO 線程;

2、忙輪詢,等待所有的線程將緩存中的數(shù)據(jù)寫回給客戶端,這里寫回操作使用的多線程;

3、最后再遍歷 clients_pending_write,為那些還殘留有響應(yīng)數(shù)據(jù)的 client 注冊命令回復(fù)處理器 sendReplyToClient,等待客戶端可寫之后在事件循環(huán)中繼續(xù)回寫殘余的響應(yīng)數(shù)據(jù)。

通過上面的分析可以得出結(jié)論,Redis 多 IO 線程中多線程的應(yīng)用

1、解析客戶端的命令的時(shí)候用到了多線程,但是對于客戶端命令的執(zhí)行,使用的還是單線程;

2、給客戶端回復(fù)數(shù)據(jù)的時(shí)候,使用到了多線程。

來總結(jié)下 Redis 中多線程的執(zhí)行過程

1、Redis Server 啟動后,主線程會啟動一個(gè)時(shí)間循環(huán) (Event Loop), 持續(xù)監(jiān)聽事件;

2、client 到 server 的新連接,會調(diào)用 acceptTcpHandler 函數(shù),之后會注冊讀事件 readQueryFromClient 函數(shù),client 發(fā)給 server 的數(shù)據(jù),都會在這個(gè)函數(shù)處理;

3、客戶端發(fā)送給服務(wù)端的數(shù)據(jù),不會類似 6.0 之前的版本使用 socket 直接去讀,而是會將 client 放入到 clients_pending_read 中,里面保存了需要進(jìn)行延遲讀操作的客戶端;

4、處理 clients_pending_read 的函數(shù) handleClientsWithPendingReadsUsingThreads,在每次事件循環(huán)的時(shí)候都會調(diào)用;

1、主線程會根據(jù) clients_pending_read 中客戶端數(shù)量對 IO 線程進(jìn)行取模運(yùn)算,取模的結(jié)果就是客戶端分配給對應(yīng) IO 線程的編號;

2、忙輪詢,等待所有的線程完成讀取客戶端命令的操作,這一步用到了多線程的請求;

3、遍歷 clients_pending_read,執(zhí)行所有 client 的命令,這里就是在主線程中執(zhí)行的,命令的執(zhí)行是單線程的操作。

5、命令執(zhí)行完成以后,回復(fù)的內(nèi)容還是會被寫入到 client 的緩存區(qū)中,這些 client 和 6.0 之前的版本處理方式一樣,也是會被放入到 clients_pending_write(待寫回?cái)?shù)據(jù)的客戶端);

6、6.0 對于 clients_pending_write 的處理使用到了多線程;

1、也是會將 client 分配給所有的 IO 線程;

2、忙輪詢,等待所有的線程將緩存中的數(shù)據(jù)寫回給客戶端,這里寫回操作使用的多線程;

3、最后再遍歷 clients_pending_write,為那些還殘留有響應(yīng)數(shù)據(jù)的 client 注冊命令回復(fù)處理器 sendReplyToClient,等待客戶端可寫之后在事件循環(huán)中繼續(xù)回寫殘余的響應(yīng)數(shù)據(jù)。

原子性的單命令

通過上面的分析,我們知道,Redis 的主線程是單線程執(zhí)行的,所有 Redis 中的單命令,都是原子性的。

所以對于一些場景的操作盡量去使用 Redis 中單命令去完成,就能保證命令執(zhí)行的原子性。

比如對于上面的讀取 - 修改 - 寫回操作可以使用 Redis 中的原子計(jì)數(shù)器, INCRBY(自增)、DECRBR(自減)、INCR(加 1)和 DECR(減 1)等命令。

這些命令可以直接幫助我們處理并發(fā)控制

127.0.0.1:6379  incr test-1
(integer) 1
127.0.0.1:6379  incr test-1
(integer) 2
127.0.0.1:6379  incr test-1
(integer) 3

分析下源碼,看看這個(gè)命令是如何實(shí)現(xiàn)的

// https://github.com/redis/redis/blob/6.2/src/t_string.c#L617
void incrCommand(client *c) { incrDecrCommand(c,1);
void decrCommand(client *c) { incrDecrCommand(c,-1);
void incrbyCommand(client *c) {
 long long incr;
 if (getLongLongFromObjectOrReply(c, c- argv[2],  incr, NULL) != C_OK) return;
 incrDecrCommand(c,incr);
void decrbyCommand(client *c) {
 long long incr;
 if (getLongLongFromObjectOrReply(c, c- argv[2],  incr, NULL) != C_OK) return;
 incrDecrCommand(c,-incr);
}

可以看到 INCRBY(自增)、DECRBR(自減)、INCR(加 1)和 DECR(減 1)這幾個(gè)命令最終都是調(diào)用的 incrDecrCommand

// https://github.com/redis/redis/blob/6.2/src/t_string.c#L579 
void incrDecrCommand(client *c, long long incr) {
 long long value, oldvalue;
 robj *o, *new;
 //  查找有沒有對應(yīng)的鍵值
 o = lookupKeyWrite(c- db,c- argv[1]);
 //  判斷類型,如果 value 對象不是字符串類型,直接返回
 if (checkType(c,o,OBJ_STRING)) return;
 //  將字符串類型的 value 轉(zhuǎn)換為 longlong 類型保存在 value 中
 if (getLongLongFromObjectOrReply(c,o, value,NULL) != C_OK) return;
 //  備份舊的 value
 oldvalue = value;
 //  判斷  incr  的值是否超過 longlong 類型所能表示的范圍
 //  長度的范圍,十進(jìn)制  64  位有符號整數(shù)
 if ((incr   0   oldvalue   0   incr   (LLONG_MIN-oldvalue)) ||
 (incr   0   oldvalue   0   incr   (LLONG_MAX-oldvalue))) {
 addReplyError(c, increment or decrement would overflow 
 return;
 }
 //  計(jì)算新的  value 值
 value += incr;
 if (o   o- refcount == 1   o- encoding == OBJ_ENCODING_INT  
 (value   0 || value  = OBJ_SHARED_INTEGERS)  
 value  = LONG_MIN   value  = LONG_MAX)
 {
 new = o;
 o- ptr = (void*)((long)value);
 } else { new = createStringObjectFromLongLongForValue(value);
 //  如果之前的  value  對象存在
 if (o) {
 //  重寫為  new  的值  
 dbOverwrite(c- db,c- argv[1],new);
 } else {
 //  如果之前沒有對應(yīng)的  value, 新設(shè)置  value  的值
 dbAdd(c- db,c- argv[1],new);
 }
 }
 //  進(jìn)行通知
 signalModifiedKey(c,c- db,c- argv[1]);
 notifyKeyspaceEvent(NOTIFY_STRING, incrby ,c- argv[1],c- db- 
 server.dirty++;
 addReply(c,shared.colon);
 addReply(c,new);
 addReply(c,shared.crlf);
}

感謝各位的閱讀,以上就是“Redis 中命令的原子性是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對 Redis 中命令的原子性是什么這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-07-13發(fā)表,共計(jì)27378字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 司法| 治多县| 明星| 阿图什市| 岑溪市| 大埔区| 鞍山市| 平度市| 文安县| 盐池县| 舒兰市| 安新县| 陵水| 临沧市| 乐平市| 印江| 侯马市| 莫力| 衡东县| 平顺县| 桂阳县| 广安市| 洪洞县| 石楼县| 宝清县| 麟游县| 茂名市| 宽甸| 泽普县| 巴马| 临洮县| 淅川县| 云和县| 星子县| 镇宁| 禄劝| 徐州市| 宜宾县| 东宁县| 额尔古纳市| 阿拉善右旗|