共計(jì) 18883 個(gè)字符,預(yù)計(jì)需要花費(fèi) 48 分鐘才能閱讀完成。
本篇內(nèi)容主要講解“如何深入理解 Redis 事務(wù)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓丸趣 TV 小編來帶大家學(xué)習(xí)“如何深入理解 Redis 事務(wù)”吧!
Redis 可以看成 NoSQL 類型的數(shù)據(jù)庫系統(tǒng), Redis 也提供了事務(wù), 但是和傳統(tǒng)的關(guān)系型數(shù)據(jù)庫的事務(wù)既有相似性, 也存在區(qū)別. 因?yàn)?Redis 的架構(gòu)基于操作系統(tǒng)的多路復(fù)用的 IO 接口, 主處理流程是一個(gè)單線程, 因此對(duì)于一個(gè)完整的命令, 其處理都是原子性的, 但是如果需要將多個(gè)命令作為一個(gè)不可分割的處理序列, 就需要使用事務(wù).
Redis 事務(wù)有如下一些特點(diǎn):
事務(wù)中的命令序列執(zhí)行的時(shí)候是原子性的, 也就是說, 其不會(huì)被其他客戶端的命令中斷. 這和傳統(tǒng)的數(shù)據(jù)庫的事務(wù)的屬性是類似的.
盡管 Redis 事務(wù)中的命令序列是原子執(zhí)行的, 但是事務(wù)中的命令序列執(zhí)行可以部分成功, 這種情況下,Redis 事務(wù)不會(huì)執(zhí)行回滾操作. 這和傳統(tǒng)關(guān)系型數(shù)據(jù)庫的事務(wù)是有區(qū)別的.
盡管 Redis 有 RDB 和 AOF 兩種數(shù)據(jù)持久化機(jī)制, 但是其設(shè)計(jì)目標(biāo)是高效率的 cache 系統(tǒng). Redis 事務(wù)只保證將其命令序列中的操作結(jié)果提交到內(nèi)存中, 不保證持久化到磁盤文件. 更進(jìn)一步的, Redis 事務(wù)和 RDB 持久化機(jī)制沒有任何關(guān)系, 因?yàn)?RDB 機(jī)制是對(duì)內(nèi)存數(shù)據(jù)結(jié)構(gòu)的全量的快照. 由于 AOF 機(jī)制是一種增量持久化, 所以事務(wù)中的命令序列會(huì)提交到 AOF 的緩存中. 但是 AOF 機(jī)制將其緩存寫入磁盤文件是由其配置的實(shí)現(xiàn)策略決定的, 和 Redis 事務(wù)沒有關(guān)系.
Redis 事務(wù) API
從宏觀上來講, Redis 事務(wù)開始后, 會(huì)緩存后續(xù)的操作命令及其操作數(shù)據(jù), 當(dāng)事務(wù)提交時(shí), 原子性的執(zhí)行緩存的命令序列.
從版本 2.2 開始,Redis 提供了一種樂觀的鎖機(jī)制, 配合這種機(jī)制,Redis 事務(wù)提交時(shí), 變成了事務(wù)的條件執(zhí)行. 具體的說, 如果樂觀鎖失敗了, 事務(wù)提交時(shí), 丟棄事務(wù)中的命令序列, 如果樂觀鎖成功了, 事務(wù)提交時(shí), 才會(huì)執(zhí)行其命令序列. 當(dāng)然, 也可以不使用樂觀鎖機(jī)制, 在事務(wù)提交時(shí), 無條件執(zhí)行事務(wù)的命令序列.
Redis 事務(wù)涉及到 MULTI, EXEC, DISCARD, WATCH 和 UNWATCH 這五個(gè)命令:
事務(wù)開始的命令是 MULTI, 該命令返回 OK 提示信息. Redis 不支持事務(wù)嵌套, 執(zhí)行多次 MULTI 命令和執(zhí)行一次是相同的效果. 嵌套執(zhí)行 MULTI 命令時(shí),Redis 只是返回錯(cuò)誤提示信息.
EXEC 是事務(wù)的提交命令, 事務(wù)中的命令序列將被執(zhí)行 (或者不被執(zhí)行, 比如樂觀鎖失敗等). 該命令將返回響應(yīng)數(shù)組, 其內(nèi)容對(duì)應(yīng)事務(wù)中的命令執(zhí)行結(jié)果.
WATCH 命令是開始執(zhí)行樂觀鎖, 該命令的參數(shù)是 key(可以有多個(gè)), Redis 將執(zhí)行 WATCH 命令的客戶端對(duì)象和 key 進(jìn)行關(guān)聯(lián), 如果其他客戶端修改了這些 key, 則執(zhí)行 WATCH 命令的客戶端將被設(shè)置樂觀鎖失敗的標(biāo)志. 該命令必須在事務(wù)開始前執(zhí)行, 即在執(zhí)行 MULTI 命令前執(zhí)行 WATCH 命令, 否則執(zhí)行無效, 并返回錯(cuò)誤提示信息.
UNWATCH 命令將取消當(dāng)前客戶端對(duì)象的樂觀鎖 key, 該客戶端對(duì)象的事務(wù)提交將變成無條件執(zhí)行.
DISCARD 命令將結(jié)束事務(wù), 并且會(huì)丟棄全部的命令序列.
需要注意的是,EXEC 命令和 DISCARD 命令結(jié)束事務(wù)時(shí), 會(huì)調(diào)用 UNWATCH 命令, 取消該客戶端對(duì)象上所有的樂觀鎖 key.
無條件提交
如果不使用樂觀鎖, 則事務(wù)為無條件提交. 下面是一個(gè)事務(wù)執(zhí)行的例子:
multi +OK incr key1 +QUEUED set key2 val2 +QUEUED exec *2 :1 +OK
當(dāng)客戶端開始事務(wù)后, 后續(xù)發(fā)送的命令將被 Redis 緩存起來,Redis 向客戶端返回響應(yīng)提示字符串 QUEUED. 當(dāng)執(zhí)行 EXEC 提交事務(wù)時(shí), 緩存的命令依次被執(zhí)行, 返回命令序列的執(zhí)行結(jié)果.
事務(wù)的錯(cuò)誤處理
事務(wù)提交命令 EXEC 有可能會(huì)失敗, 有三種類型的失敗場(chǎng)景:
在事務(wù)提交之前, 客戶端執(zhí)行的命令緩存失敗. 比如命令的語法錯(cuò)誤 (命令參數(shù)個(gè)數(shù)錯(cuò)誤, 不支持的命令等等). 如果發(fā)生這種類型的錯(cuò)誤,Redis 將向客戶端返回包含錯(cuò)誤提示信息的響應(yīng).
事務(wù)提交時(shí), 之前緩存的命令有可能執(zhí)行失敗.
由于樂觀鎖失敗, 事務(wù)提交時(shí), 將丟棄之前緩存的所有命令序列.
當(dāng)發(fā)生第一種失敗的情況下, 客戶端在執(zhí)行事務(wù)提交命令 EXEC 時(shí), 將丟棄事務(wù)中所有的命令序列. 下面是一個(gè)例子:
multi +OK incr num1 num2 -ERR wrong number of arguments for incr command set key1 val1 +QUEUED exec -EXECABORT Transaction discarded because of previous errors.
命令 incr num1 num2 并沒有緩存成功, 因?yàn)?incr 命令只允許有一個(gè)參數(shù), 是個(gè)語法錯(cuò)誤的命令.Redis 無法成功緩存該命令, 向客戶端發(fā)送錯(cuò)誤提示響應(yīng). 接下來的 set key1 val1 命令緩存成功. 最后執(zhí)行事務(wù)提交的時(shí)候, 因?yàn)榘l(fā)生過命令緩存失敗, 所以事務(wù)中的所有命令序列被丟棄.
如果事務(wù)中的所有命令序列都緩存成功, 在提交事務(wù)的時(shí)候, 緩存的命令中仍可能執(zhí)行失敗. 但 Redis 不會(huì)對(duì)事務(wù)做任何回滾補(bǔ)救操作. 下面是一個(gè)這樣的例子:
multi +OK set key1 val1 +QUEUED lpop key1 +QUEUED incr num1 +QUEUED exec *3 +OK -WRONGTYPE Operation against a key holding the wrong kind of value :1
所有的命令序列都緩存成功, 但是在提交事務(wù)的時(shí)候, 命令 set key1 val1 和 incr num1 執(zhí)行成功了,Redis 保存了其執(zhí)行結(jié)果, 但是命令 lpop key1 執(zhí)行失敗了.
樂觀鎖機(jī)制
Redis 事務(wù)和樂觀鎖一起使用時(shí), 事務(wù)將成為有條件提交.
關(guān)于樂觀鎖, 需要注意的是:
WATCH 命令必須在 MULTI 命令之前執(zhí)行. WATCH 命令可以執(zhí)行多次.
WATCH 命令可以指定樂觀鎖的多個(gè) key, 如果在事務(wù)過程中, 任何一個(gè) key 被其他客戶端改變, 則當(dāng)前客戶端的樂觀鎖失敗, 事務(wù)提交時(shí), 將丟棄所有命令序列.
多個(gè)客戶端的 WATCH 命令可以指定相同的 key.
WATCH 命令指定樂觀鎖后, 可以接著執(zhí)行 MULTI 命令進(jìn)入事務(wù)上下文, 也可以在 WATCH 命令和 MULTI 命令之間執(zhí)行其他命令. 具體使用方式取決于場(chǎng)景需求, 不在事務(wù)中的命令將立即被執(zhí)行.
如果 WATCH 命令指定的樂觀鎖的 key, 被當(dāng)前客戶端改變, 在事務(wù)提交時(shí), 樂觀鎖不會(huì)失敗.
如果 WATCH 命令指定的樂觀鎖的 key 具有超時(shí)屬性, 并且該 key 在 WATCH 命令執(zhí)行后, 在事務(wù)提交命令 EXEC 執(zhí)行前超時(shí), 則樂觀鎖不會(huì)失敗. 如果該 key 被其他客戶端對(duì)象修改, 則樂觀鎖失敗.
一個(gè)執(zhí)行樂觀鎖機(jī)制的事務(wù)例子:
rpush list v1 v2 v3 :3 watch list +OK multi +OK lpop list +QUEUED exec *1 $2 v1
下面是另一個(gè)例子, 樂觀鎖被當(dāng)前客戶端改變, 事務(wù)提交成功:
watch num +OK multi +OK incr num +QUEUED exec *1 :2
Redis 事務(wù)和樂觀鎖配合使用時(shí), 可以構(gòu)造實(shí)現(xiàn)單個(gè) Redis 命令不能完成的更復(fù)雜的邏輯.
Redis 事務(wù)的源碼實(shí)現(xiàn)機(jī)制
首先, 事務(wù)開始的 MULTI 命令執(zhí)行的函數(shù)為 multiCommand, 其實(shí)現(xiàn)為 (multi.c):
void multiCommand(redisClient *c) { if (c- flags REDIS_MULTI) { addReplyError(c, MULTI calls can not be nested return; } c- flags |= REDIS_MULTI; addReply(c,shared.ok); }
該命令只是在當(dāng)前客戶端對(duì)象上加上 REDIS_MULTI 標(biāo)志, 表示該客戶端進(jìn)入了事務(wù)上下文.
客戶端進(jìn)入事務(wù)上下文后, 后續(xù)執(zhí)行的命令將被緩存. 函數(shù) processCommand 是 Redis 處理客戶端命令的入口函數(shù), 其實(shí)現(xiàn)為 (redis.c):
int processCommand(redisClient *c) { /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in * a regular command proc. */ if (!strcasecmp(c- argv[0]- ptr, quit )) { addReply(c,shared.ok); c- flags |= REDIS_CLOSE_AFTER_REPLY; return REDIS_ERR; } /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ c- ccmd = c- lastcmd = lookupCommand(c- argv[0]- ptr); if (!c- cmd) { flagTransaction(c); addReplyErrorFormat(c, unknown command %s , (char*)c- argv[0]- ptr); return REDIS_OK; } else if ((c- cmd- arity 0 c- cmd- arity != c- argc) || (c- argc -c- cmd- arity)) { flagTransaction(c); addReplyErrorFormat(c, wrong number of arguments for %s command , c- cmd- name); return REDIS_OK; } /* Check if the user is authenticated */ if (server.requirepass !c- authenticated c- cmd- proc != authCommand) { flagTransaction(c); addReply(c,shared.noautherr); return REDIS_OK; } /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. */ if (server.maxmemory) { int retval = freeMemoryIfNeeded(); /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return REDIS_ERR; /* It was impossible to free enough memory, and the command the client * is trying to execute is denied during OOM conditions? Error. */ if ((c- cmd- flags REDIS_CMD_DENYOOM) retval == REDIS_ERR) { flagTransaction(c); addReply(c, shared.oomerr); return REDIS_OK; } } /* Don t accept write commands if there are problems persisting on disk * and if this is a master instance. */ if (((server.stop_writes_on_bgsave_err server.saveparamslen 0 server.lastbgsave_status == REDIS_ERR) || server.aof_last_write_status == REDIS_ERR) server.masterhost == NULL (c- cmd- flags REDIS_CMD_WRITE || c- cmd- proc == pingCommand)) { flagTransaction(c); if (server.aof_last_write_status == REDIS_OK) addReply(c, shared.bgsaveerr); else addReplySds(c, sdscatprintf(sdsempty(), -MISCONF Errors writing to the AOF file: %s\r\n , strerror(server.aof_last_write_errno))); return REDIS_OK; } /* Don t accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ if (server.masterhost == NULL server.repl_min_slaves_to_write server.repl_min_slaves_max_lag c- cmd- flags REDIS_CMD_WRITE server.repl_good_slaves_count server.repl_min_slaves_to_write) { flagTransaction(c); addReply(c, shared.noreplicaserr); return REDIS_OK; } /* Don t accept write commands if this is a read only slave. But * accept write commands if this is our master. */ if (server.masterhost server.repl_slave_ro !(c- flags REDIS_MASTER) c- cmd- flags REDIS_CMD_WRITE) { addReply(c, shared.roslaveerr); return REDIS_OK; } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ if (c- flags REDIS_PUBSUB c- cmd- proc != pingCommand c- cmd- proc != subscribeCommand c- cmd- proc != unsubscribeCommand c- cmd- proc != psubscribeCommand c- cmd- proc != punsubscribeCommand) { addReplyError(c, only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context return REDIS_OK; } /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and * we are a slave with a broken link with master. */ if (server.masterhost server.repl_state != REDIS_REPL_CONNECTED server.repl_serve_stale_data == 0 !(c- cmd- flags REDIS_CMD_STALE)) { flagTransaction(c); addReply(c, shared.masterdownerr); return REDIS_OK; } /* Loading DB? Return an error if the command has not the * REDIS_CMD_LOADING flag. */ if (server.loading !(c- cmd- flags REDIS_CMD_LOADING)) { addReply(c, shared.loadingerr); return REDIS_OK; } /* Lua script too slow? Only allow a limited number of commands. */ if (server.lua_timedout c- cmd- proc != authCommand c- cmd- proc != replconfCommand !(c- cmd- proc == shutdownCommand c- argc == 2 tolower(((char*)c- argv[1]- ptr)[0]) == n ) !(c- cmd- proc == scriptCommand c- argc == 2 tolower(((char*)c- argv[1]- ptr)[0]) == k )) { flagTransaction(c); addReply(c, shared.slowscripterr); return REDIS_OK; } /* Exec the command */ if (c- flags REDIS_MULTI c- cmd- proc != execCommand c- cmd- proc != discardCommand c- cmd- proc != multiCommand c- cmd- proc != watchCommand) { queueMultiCommand(c); addReply(c,shared.queued); } else { call(c,REDIS_CALL_FULL); if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } return REDIS_OK; }
Line145:151 當(dāng)客戶端處于事務(wù)上下文時(shí), 如果接收的是非事務(wù)命令 (MULTI, EXEC, WATCH, DISCARD), 則調(diào)用 queueMultiCommand 將命令緩存起來, 然后向客戶端發(fā)送成功響應(yīng).
在函數(shù) processCommand 中, 在緩存命令之前, 如果檢查到客戶端發(fā)送的命令不存在, 或者命令參數(shù)個(gè)數(shù)不正確等情況, 會(huì)調(diào)用函數(shù) flagTransaction 標(biāo)命令緩存失敗. 也就是說, 函數(shù) processCommand 中, 所有調(diào)用函數(shù) flagTransaction 的條件分支, 都是返回失敗響應(yīng).
緩存命令的函數(shù) queueMultiCommand 的實(shí)現(xiàn)為 (multi.c):
/* Add a new command into the MULTI commands queue */ void queueMultiCommand(redisClient *c) { multiCmd *mc; int j; c- mstate.commands = zrealloc(c- mstate.commands, sizeof(multiCmd)*(c- mstate.count+1)); mc = c- mstate.commands+c- mstate.count; mc- ccmd = c- mc- argc = c- argc; mc- argv = zmalloc(sizeof(robj*)*c- argc); memcpy(mc- argv,c- argv,sizeof(robj*)*c- argc); for (j = 0; j c- argc; j++) incrRefCount(mc- argv[j]); c- mstate.count++; }
在事務(wù)上下文中, 使用 multiCmd 結(jié)構(gòu)來緩存命令, 該結(jié)構(gòu)定義為 (redis.h):
/* Client MULTI/EXEC state */ typedef struct multiCmd { robj **argv; int argc; struct redisCommand *cmd; } multiCmd;
其中 argv 字段指向命令的參數(shù)內(nèi)存地址,argc 為命令參數(shù)個(gè)數(shù), cmd 為命令描述結(jié)構(gòu), 包括名字和函數(shù)指針等.
命令參數(shù)的內(nèi)存空間已經(jīng)使用動(dòng)態(tài)分配記錄于客戶端對(duì)象的 argv 字段了, multiCmd 結(jié)構(gòu)的 argv 字段指向客戶端對(duì)象 redisClient 的 argv 即可.
無法緩存命令時(shí), 調(diào)用函數(shù) flagTransaction, 該函數(shù)的實(shí)現(xiàn)為 (multi.c):
/* Flag the transacation as DIRTY_EXEC so that EXEC will fail. * Should be called every time there is an error while queueing a command. */ void flagTransaction(redisClient *c) { if (c- flags REDIS_MULTI) c- flags |= REDIS_DIRTY_EXEC; }
該函數(shù)在客戶端對(duì)象中設(shè)置 REDIS_DIRTY_EXEC 標(biāo)志, 如果設(shè)置了這個(gè)標(biāo)志, 事務(wù)提交時(shí), 命令序列將被丟棄.
最后, 在事務(wù)提交時(shí), 函數(shù) processCommand 中將調(diào)用 call(c,REDIS_CALL_FULL);, 其實(shí)現(xiàn)為 (redis.c):
/* Call() is the core of Redis execution of a command */ void call(redisClient *c, int flags) { long long dirty, start, duration; int cclient_old_flags = c- flags; /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ if (listLength(server.monitors) !server.loading !(c- cmd- flags (REDIS_CMD_SKIP_MONITOR|REDIS_CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c- db- id,c- argv,c- argc); } /* Call the command. */ c- flags = ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); redisOpArrayInit(server.also_propagate); dirty = server.dirty; start = ustime(); c- cmd- proc(c); duration = ustime()-start; dirty = server.dirty-dirty; if (dirty 0) dirty = 0; /* When EVAL is called loading the AOF we don t want commands called * from Lua to go into the slowlog or to populate statistics. */ if (server.loading c- flags REDIS_LUA_CLIENT) flags = ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS); /* If the caller is Lua, we want to force the EVAL caller to propagate * the script if the command flag or client flag are forcing the * propagation. */ if (c- flags REDIS_LUA_CLIENT server.lua_caller) { if (c- flags REDIS_FORCE_REPL) server.lua_caller- flags |= REDIS_FORCE_REPL; if (c- flags REDIS_FORCE_AOF) server.lua_caller- flags |= REDIS_FORCE_AOF; } /* Log the command into the Slow log if needed, and populate the * per-command statistics that we show in INFO commandstats. */ if (flags REDIS_CALL_SLOWLOG c- cmd- proc != execCommand) { char *latency_event = (c- cmd- flags REDIS_CMD_FAST) ? fast-command : command latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c- argv,c- argc,duration); } if (flags REDIS_CALL_STATS) { c- cmd- microseconds += duration; c- cmd- calls++; } /* Propagate the command into the AOF and replication link */ if (flags REDIS_CALL_PROPAGATE) { int flags = REDIS_PROPAGATE_NONE; if (c- flags REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL; if (c- flags REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF; if (dirty) flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF); if (flags != REDIS_PROPAGATE_NONE) propagate(c- cmd,c- db- id,c- argv,c- argc,flags); } /* Restore the old FORCE_AOF/REPL flags, since call can be executed * recursively. */ c- flags = ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); c- flags |= client_old_flags (REDIS_FORCE_AOF|REDIS_FORCE_REPL); /* Handle the alsoPropagate() API to handle commands that want to propagate * multiple separated commands. */ if (server.also_propagate.numops) { int j; redisOp *rop; for (j = 0; j server.also_propagate.numops; j++) { rop = server.also_propagate.ops[j]; propagate(rop- cmd, rop- dbid, rop- argv, rop- argc, rop- target); } redisOpArrayFree(server.also_propagate); } server.stat_numcommands++; }
在函數(shù) call 中通過執(zhí)行 c - cmd- proc(c); 調(diào)用具體的命令函數(shù). 事務(wù)提交命令 EXEC 對(duì)應(yīng)的執(zhí)行函數(shù)為 execCommand, 其實(shí)現(xiàn)為 (multi.c):
void execCommand(redisClient *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ if (!(c- flags REDIS_MULTI)) { addReplyError(c, EXEC without MULTI return; } /* Check if we need to abort the EXEC because: * 1) Some WATCHed key was touched. * 2) There was a previous error while queueing commands. * A failed EXEC in the first case returns a multi bulk nil object * (technically it is not an error but a special behavior), while * in the second an EXECABORT error is returned. */ if (c- flags (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) { addReply(c, c- flags REDIS_DIRTY_EXEC ? shared.execaborterr : shared.nullmultibulk); discardTransaction(c); goto handle_monitor; } /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we ll waste CPU cycles */ orig_argv = c- argv; orig_argc = c- argc; orig_cmd = c- addReplyMultiBulkLen(c,c- mstate.count); for (j = 0; j c- mstate.count; j++) { c- argc = c- mstate.commands[j].argc; c- argv = c- mstate.commands[j].argv; c- ccmd = c- mstate.commands[j].cmd; /* Propagate a MULTI request once we encounter the first write op. * This way we ll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ if (!must_propagate !(c- cmd- flags REDIS_CMD_READONLY)) { execCommandPropagateMulti(c); must_propagate = 1; } call(c,REDIS_CALL_FULL); /* Commands may alter argc/argv, restore mstate. */ c- mstate.commands[j].argc = c- argc; c- mstate.commands[j].argv = c- argv; c- mstate.commands[j].cmd = c- } c- argv = orig_argv; c- argc = orig_argc; c- cmd = orig_cmd; discardTransaction(c); /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ if (must_propagate) server.dirty++; handle_monitor: /* Send EXEC to clients waiting data from MONITOR. We do it here * since the natural order of commands execution is actually: * MUTLI, EXEC, ... commands inside transaction ... * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command * table, and we do it here with correct ordering. */ if (listLength(server.monitors) !server.loading) replicationFeedMonitors(c,server.monitors,c- db- id,c- argv,c- argc); }
LINE8:11 檢查 EXEC 命令和 MULTI 命令是否配對(duì)使用, 單獨(dú)執(zhí)行 EXEC 命令是沒有意義的.
LINE19:24 檢查客戶端對(duì)象是否具有 REDIS_DIRTY_CAS 或者 REDIS_DIRTY_EXEC 標(biāo)志, 如果存在, 則調(diào)用函數(shù) discardTransaction 丟棄命令序列, 向客戶端返回失敗響應(yīng).
如果沒有檢查到任何錯(cuò)誤, 則先執(zhí)行 unwatchAllKeys(c); 取消該客戶端上所有的樂觀鎖 key.
LINE32:52 依次執(zhí)行緩存的命令序列, 這里有兩點(diǎn)需要注意的是:
事務(wù)可能需要同步到 AOF 緩存或者 replica 備份節(jié)點(diǎn)中. 如果事務(wù)中的命令序列都是讀操作, 則沒有必要向 AOF 和 replica 進(jìn)行同步. 如果事務(wù)的命令序列中包含寫命令, 則 MULTI, EXEC 和相關(guān)的寫命令會(huì)向 AOF 和 replica 進(jìn)行同步. 根據(jù) LINE41:44 的條件判斷, 執(zhí)行 execCommandPropagateMulti(c); 保證 MULTI 命令同步, LINE59 檢查 EXEC 命令是否需要同步, 即 MULTI 命令和 EXEC 命令必須保證配對(duì)同步.EXEC 命令的同步執(zhí)行在函數(shù)的 call 中 LINE62propagate(c- cmd,c- db- id,c- argv,c- argc,flags);, 具體的寫入命令由各自的執(zhí)行函數(shù)負(fù)責(zé)同步.
這里執(zhí)行命令序列時(shí), 通過執(zhí)行 call(c,REDIS_CALL_FULL); 所以 call 函數(shù)是遞歸調(diào)用.
所以, 綜上所述, Redis 事務(wù)其本質(zhì)就是, 以不可中斷的方式依次執(zhí)行緩存的命令序列, 將結(jié)果保存到內(nèi)存 cache 中.
事務(wù)提交時(shí), 丟棄命令序列會(huì)調(diào)用函數(shù) discardTransaction, 其實(shí)現(xiàn)為 (multi.c):
void discardTransaction(redisClient *c) { freeClientMultiState(c); initClientMultiState(c); c- flags = ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC); unwatchAllKeys(c); }
該函數(shù)調(diào)用 freeClientMultiState 釋放 multiCmd 對(duì)象內(nèi)存. 調(diào)用 initClientMultiState 復(fù)位客戶端對(duì)象的緩存命令管理結(jié)構(gòu). 調(diào)用 unwatchAllKeys 取消該客戶端的樂觀鎖.
WATCH 命令執(zhí)行樂觀鎖, 其對(duì)應(yīng)的執(zhí)行函數(shù)為 watchCommand, 其實(shí)現(xiàn)為 (multi.c):
void watchCommand(redisClient *c) { int j; if (c- flags REDIS_MULTI) { addReplyError(c, WATCH inside MULTI is not allowed return; } for (j = 1; j c- argc; j++) watchForKey(c,c- argv[j]); addReply(c,shared.ok); }
進(jìn)而調(diào)用函數(shù) watchForKey, 其實(shí)現(xiàn)為 (multi.c):
/* Watch for the specified key */ void watchForKey(redisClient *c, robj *key) { list *clients = NULL; listIter li; listNode *ln; watchedKey *wk; /* Check if we are already watching for this key */ listRewind(c- watched_keys, li); while((ln = listNext( li))) { wk = listNodeValue(ln); if (wk- db == c- db equalStringObjects(key,wk- key)) return; /* Key already watched */ } /* This key is not already watched in this DB. Let s add it */ clients = dictFetchValue(c- db- watched_keys,key); if (!clients) { clients = listCreate(); dictAdd(c- db- watched_keys,key,clients); incrRefCount(key); } listAddNodeTail(clients,c); /* Add the new key to the list of keys watched by this client */ wk = zmalloc(sizeof(*wk)); wk- keykey = key; wk- db = c- incrRefCount(key); listAddNodeTail(c- watched_keys,wk); }
關(guān)于樂觀鎖的 key, 既保存于其客戶端對(duì)象的 watched_keys 鏈表中, 也保存于全局?jǐn)?shù)據(jù)庫對(duì)象的 watched_keys 哈希表中.
LINE10:14 檢查客戶端對(duì)象的鏈表中是否已經(jīng)存在該 key, 如果已經(jīng)存在, 則直接返回.LINE16 在全局?jǐn)?shù)據(jù)庫中返回該 key 對(duì)應(yīng)的客戶端對(duì)象鏈表, 如果鏈表不存在, 說明其他客戶端沒有使用該 key 作為樂觀鎖, 如果鏈表存在, 說明其他客戶端已經(jīng)使用該 key 作為樂觀鎖. LINE22 將當(dāng)前客戶端對(duì)象記錄于該 key 對(duì)應(yīng)的鏈表中. LINE28 將該 key 記錄于當(dāng)前客戶端的 key 鏈表中.
當(dāng)前客戶端執(zhí)行樂觀鎖以后, 其他客戶端的寫入命令可能修改該 key 值. 所有具有寫操作屬性的命令都會(huì)執(zhí)行函數(shù) signalModifiedKey, 其實(shí)現(xiàn)為 (db.c):
void signalModifiedKey(redisDb *db, robj *key) { touchWatchedKey(db,key); }
函數(shù) touchWatchedKey 的實(shí)現(xiàn)為 (multi.c):
/* Touch a key, so that if this key is being WATCHed by some client the * next EXEC will fail. */ void touchWatchedKey(redisDb *db, robj *key) { list *clients; listIter li; listNode *ln; if (dictSize(db- watched_keys) == 0) return; clients = dictFetchValue(db- watched_keys, key); if (!clients) return; /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ /* Check if we are already watching for this key */ listRewind(clients, li); while((ln = listNext( li))) { redisClient *c = listNodeValue(ln); c- flags |= REDIS_DIRTY_CAS; } }
語句 if (dictSize(db- watched_keys) == 0) return; 檢查全局?jǐn)?shù)據(jù)庫中的哈希表 watched_keys 是否為空, 如果為空, 說明沒有任何客戶端執(zhí)行 WATCH 命令, 直接返回. 如果該哈希表不為空, 取回該 key 對(duì)應(yīng)的客戶端鏈表結(jié)構(gòu), 并把該鏈表中的每個(gè)客戶端對(duì)象設(shè)置 REDIS_DIRTY_CAS 標(biāo)志. 前面在 EXEC 的執(zhí)行命令中, 進(jìn)行過條件判斷, 如果客戶端對(duì)象具有這個(gè)標(biāo)志, 則丟棄事務(wù)中的命令序列.
在執(zhí)行 EXEC, DISCARD, UNWATCH 命令以及在客戶端結(jié)束連接的時(shí)候, 都會(huì)取消樂觀鎖, 最終都會(huì)執(zhí)行函數(shù) unwatchAllKeys, 其實(shí)現(xiàn)為 (multi.c):
/* Unwatch all the keys watched by this client. To clean the EXEC dirty * flag is up to the caller. */ void unwatchAllKeys(redisClient *c) { listIter li; listNode *ln; if (listLength(c- watched_keys) == 0) return; listRewind(c- watched_keys, li); while((ln = listNext( li))) { list *clients; watchedKey *wk; /* Lookup the watched key - clients list and remove the client * from the list */ wk = listNodeValue(ln); clients = dictFetchValue(wk- db- watched_keys, wk- key); redisAssertWithInfo(c,NULL,clients != NULL); listDelNode(clients,listSearchKey(clients,c)); /* Kill the entry at all if this was the only client */ if (listLength(clients) == 0) dictDelete(wk- db- watched_keys, wk- key); /* Remove this watched key from the client- watched list */ listDelNode(c- watched_keys,ln); decrRefCount(wk- key); zfree(wk); } }
語句 if (listLength(c- watched_keys) == 0) return; 判斷如果當(dāng)前客戶端對(duì)象的 watched_keys 鏈表為空, 說明當(dāng)前客戶端沒有執(zhí)行 WATCH 命令, 直接返回. 如果該鏈表非空, 則依次遍歷該鏈表中的 key, 并從該鏈表中刪除 key, 同時(shí), 獲得全局?jǐn)?shù)據(jù)庫中的哈希表 watched_keys 中該 key 對(duì)應(yīng)的客戶端鏈表, 刪除當(dāng)前客戶端對(duì)象.
到此,相信大家對(duì)“如何深入理解 Redis 事務(wù)”有了更深的了解,不妨來實(shí)際操作一番吧!這里是丸趣 TV 網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!