久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

python如何使用redis做隊列服務

190次閱讀
沒有評論

共計 5144 個字符,預計需要花費 13 分鐘才能閱讀完成。

這篇文章給大家介紹 python 如何使用 redis 做隊列服務,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

系統中引入消息隊列機制是對系統一個非常大的改善。例如一個 web 系統中,用戶做了某項操作后需要發送郵件通知到用戶郵箱中。你可以使用同步方式讓用戶等待郵件發送完成后反饋給用戶,但是這樣可能會因為網絡的不確定性造成用戶長時間的等待從而影響用戶體驗。

有些場景下是不可能使用同步方式等待完成的,那些需要后臺花費大量時間的操作。例如極端例子,一個在線編譯系統任務,后臺編譯完成需要 30 分鐘。這種場景的設計不可能同步等待后在回饋,必須是先反饋用戶隨后異步處理完成,再等待處理完成后根據情況再此反饋用戶與否。

另外適用消息隊列的情況是那些系統處理能力有限的情況下,先使用隊列機制把任務暫時存放起來,系統再一個個輪流處理掉排隊的任務。這樣在系統吞吐量不足的情況下也能穩定的處理掉高并發的任務。

消息隊列可以用來做排隊機制,只要系統需要用到排隊機制的地方就可以使用消息隊列來作。

rabbitmq 的優先級做法

目前成熟的消息隊列產品有很多,著名的例如 rabbitmq。它使用起來相對還是比較簡單的,功能也相對比較豐富,一般場合下是完全夠用的。但是有個很煩人的就是它不支持優先級。例如一個發郵件的任務,某些特權用戶希望它的郵件能夠更加及時的發送出去,至少比普通用戶要優先對待。默認情況下 rabbitmq 是無法處理掉的,扔給 rabbitmq 的任務都是 FIFO 先進先出。但是我們可以使用一些變通的技巧來支持這些優先級。創建多個隊列,并為 rabbitmq 的消費者設置相應的路由規則。

例如默認情況下有這樣一個隊列,我們拿 list 來模擬 [task1, task2, task3],消費者輪流按照 FIFO 的原則一個個拿出 task 來處理掉。如果有高優先級的任務進來,它也只能跟在最后被處理 [task1, task2, task3, higitask1]. 但是如果使用兩個隊列,一個高優先級隊列,一個普通優先級隊列。普通優先級 [task1, task2, task3], 高優先級 [hightask1] 然后我們設置消費者的路由讓消費者隨機從任意隊列中取數據即可。

并且我們可以定義一個專門處理高優先級隊列的消費者,它空閑的時候也不處理低優先級隊列的數據。這類似銀行的 VIP 柜臺,普通客戶在銀行取號排隊,一個 VIP 來了他雖然沒有從取號機里拿出一個排在普通會員前面的票,但是他還是可以更快地直接走 VIP 通道。

使用 rabbitmq 來做支持優先級的消息隊列的話,就像是上面所述同銀行 VIP 會員一樣,走不同的通道。但是這種方式只是相對的優先級,做不到絕對的優先級控制,例如我希望某一個優先級高的任務在絕對意義上要比其他普通任務優先處理掉,這樣上面的方案是行不通的。因為 rabbitmq 的消費者只知道再自己空閑的情況下從自己關心的隊列中“隨機”取某一個隊列里面的第一個數據來處理,它沒法控制優先取找哪一個隊列。或者更加細粒度的優先級控制。或者你系統里面設置的優先級有 10 多種。這樣使用 rabbitmq 也是很難實現的。

但是如果使用 redis 來做隊列的話上面的需求都可以實現。

使用 redis 怎么做消息隊列

首先 redis 它的設計是用來做緩存的,但是由于它自身的某種特性使得他可以用來做消息隊列。它有幾個阻塞式的 API 可以使用,正是這些阻塞式的 API 讓他有做消息隊列的能力。

試想一下在”數據庫解決所有問題“的思路下,不使用消息隊列也是可以完成你的需求的。我們把任務全部存放在數據庫然后通過不斷的輪詢方式來取任務處理。這種做法雖然可以完成你的任務但是做法很粗劣。但是如果你的數據庫接口提供一個阻塞的方法那么就可以避免輪詢操作了,你的數據庫也可以用來做消息隊列,只不過目前的數據庫還沒有這樣的接口。另外做消息隊列的其他特性例如 FIFO 也很容易實現,只需要一個 List 對象從頭取數據,從尾部塞數據即可實現。

redis 能做消息隊列得益于他 list 對象 blpop brpop 接口以及 Pub/Sub(發布 / 訂閱) 的某些接口。他們都是阻塞版的,所以可以用來做消息隊列。

redis 消息隊列優先級的實現

一些基礎 redis 基礎知識的說明

redis  blpop tasklist 0
 im task 01

這個例子使用 blpop 命令會阻塞方式地從 tasklist 列表中取頭一個數據,最后一個參數就是等待超時的時間。如果設置為 0 則表示無限等待。另外 redis 存放的數據都只能是 string 類型,所以在任務傳遞的時候只能是傳遞字符串。我們只需要簡單的將負責數據序列化成 json 格式的字符串,然后消費者那邊再轉換一下即可。

這里我們的示例語言使用 python,鏈接 redis 的庫使用 redis-py. 如果你有些編程基礎把它切換成自己喜歡的語言應該是沒問題的。

1 簡單的 FIFO 隊列

import redis, time

def handle(info):

 print info

 time.sleep(20)

def main():

 pool = redis.ConnectionPool(host= localhost , port=6379, db=0)

 r = redis.Redis(connection_pool=pool)

 while 1:

 result = r.brpop(task , 0)            

 handle(result[1])

if __name__ == __main__ :

 main()

上例子即使一個最簡單的消費者,我們通過一個無限循環不斷地從 redis 的隊列中取數據。如果隊列中沒有數據則沒有超時的阻塞在那里,有數據則取出往下執行。

一般情況取出來是個復雜的字符串,我們可能需要將其格式化后作為再傳給處理函數, 但是為了簡單我們的例子就是一個普通字符串。另外例子中的處理函數不做任何處理,僅僅 sleep 用來模擬耗時的操作。

我們另開一個 redis 的客戶端來模擬生產者,自帶的客戶端就可以。多往 tasklist 隊列里面塞上一些數據。

redis 127.0.0.1:6379 LPUSH task fuckin1

(integer) 1

redis 127.0.0.1:6379 LPUSH task fuckin2

(integer) 1

redis 127.0.0.1:6379 LPUSH task fuckin3

(integer) 2

redis 127.0.0.1:6379 LPUSH task fuckin4

(integer) 3

redis 127.0.0.1:6379 LPUSH task fuckin5

(integer) 4

redis 127.0.0.1:6379 LPUSH task fuckin6

(integer) 5

redis 127.0.0.1:6379 LPUSH task fuckin7

(integer) 6

redis 127.0.0.1:6379 LPUSH task fuckin8

(integer) 7

redis 127.0.0.1:6379 LPUSH task fuckin10

(integer) 8

redis 127.0.0.1:6379 lrange task 0 -1

1) fuckin10

2) fuckin8

3) fuckin7

4) fuckin6

5) fuckin5

6) fuckin4

7) fuckin3

可以看到
[root@host-192-168-1-56 soft]# python duilie.py 
(task , fuckin1)
fuckin1
(task , fuckin2)    — 每個任務之間間隔 20 秒,20 秒是模擬任務執行時間
fuckin2
(task , fuckin3)
fuckin3
(task , fuckin4)
fuckin4
(task , fuckin5)
.。
。。。
。。。
(task , fuckin10)
fuckin10
。。。等待狀態,等待新的任務

2. 簡單優先級的隊列

假設一種簡單的需求,只需要高優先級的比低優先級的任務率先處理掉。其他任務之間的順序一概不管,這種我們只需要在在遇到高優先級任務的時候將它塞到隊列的前頭,而不是 push 到最后面即可。

因為我們的隊列是使用的 redis 的 list, 所以很容易實現。遇到高優先級的使用 rpush 遇到低優先級的使用 lpush

redis  lpush tasklist  im task 01 
redis  lpush tasklist  im task 02 
redis  rpush tasklist  im high task 01 
redis  rpush tasklist  im high task 01 
redis  lpush tasklist  im task 03 
redis  rpush tasklist  im high task 03

隨后會看到,高優先級的總是比低優先級的率先執行。但是這個方案的缺點是高優先級的任務之間的執行順序是先進后出的。

3. 較為完善的隊列

例子 2 中只是簡單的將高優先級的任務塞到隊列最前面,低優先級的塞到最后面。這樣保證不了高優先級任務之間的順序。

假設當所有的任務都是高優先級的話,那么他們的執行順序將是相反的。這樣明顯違背了隊列的 FIFO 原則。

不過只要稍加改進就可以完善我們的隊列。

跟使用 rabbitmq 一樣,我們設置兩個隊列,一個高優先級一個低優先級的隊列。高優先級任務放到高隊列中,低的放在低優先隊列中。redis 和 rabbitmq 不同的是它可以要求隊列消費者從哪個隊列里面先讀。

def main():
 pool = redis.ConnectionPool(host= localhost , port=6379, db=0)
 r = redis.Redis(connection_pool=pool)
 while 1:
 result = r.brpop([high_task_queue ,  low_task_queue], 0)
 handle(result[1])

上面的代碼,會阻塞地從 high_task_queue , low_task_queue 這兩個隊列里面取數據,如果第一個沒有再從第二個里面取。所以只需要將隊列消費者做這樣的改進便可以達到目的。

redis  lpush low_task_queue low001
redis  lpush low_task_queue low002
redis  lpush low_task_queue low003
redis  lpush low_task_queue low004
redis  lpush high_task_queue low001
redis  lpush high_task_queue low002
redis  lpush high_task_queue low003
redis  lpush high_task_queue low004

通過上面的測試看到,高優先級的會被率先執行,并且高優先級之間也是保證了 FIFO 的原則。

這種方案我們可以支持不同階段的優先級隊列,例如高中低三個級別或者更多的級別都可以。

4. 優先級級別很多的情況

假設有個這樣的需求,優先級不是簡單的高中低或者 0 -10 這些固定的級別。而是類似 0 -99999 這么多級別。那么我們第三種方案將不太合適了。雖然 redis 有 sorted set 這樣的可以排序的數據類型,看是很可惜它沒有阻塞版的接口。于是我們還是只能使用 list 類型通過其他方式來完成目的。

有個簡單的做法我們可以只設置一個隊列,并保證它是按照優先級排序號的。然后通過二分查找法查找一個任務合適的位置,并通過 lset 命令插入到相應的位置。例如隊列里面包含著寫優先級的任務 [1, 3, 6, 8, 9, 14],當有個優先級為 7 的任務過來,我們通過自己的二分算法一個個從隊列里面取數據出來反和目標數據比對,計算出相應的位置然后插入到指定地點即可。

因為二分查找是比較快的,并且 redis 本身也都在內存中,理論上速度是可以保證的。但是如果說數據量確實很大的話我們也可以通過一些方式來調優。

回想我們第三種方案,把第三種方案結合起來就會很大程度上減少開銷。例如數據量十萬的隊列,它們的優先級也是隨機 0 - 十萬的區間。我們可以設置 10 個或者 100 個不同的隊列,0- 一萬的優先級任務投放到 1 號隊列,一萬 - 二萬的任務投放到 2 號隊列。這樣將一個隊列按不同等級拆分后它單個隊列的數據就減少許多,這樣二分查找匹配的效率也會高一點。但是數據所占的資源基本是不變的,十萬數據該占多少內存還是多少。只是系統里面多了一些隊列而已。

關于 python 如何使用 redis 做隊列服務就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-07-27發表,共計5144字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 黔东| 广灵县| 平武县| 北海市| 疏勒县| 壶关县| 道真| 梓潼县| 古田县| 宁国市| 娄烦县| 多伦县| 边坝县| 响水县| 库车县| 新乡市| 祁连县| 正阳县| 巴马| 徐州市| 绥宁县| 唐山市| 浦城县| 仁寿县| 克拉玛依市| 裕民县| 蚌埠市| 崇义县| 沁阳市| 伊川县| 济南市| 漠河县| 石城县| 长治县| 甘德县| 岳普湖县| 卓尼县| 百色市| 台州市| 沙雅县| 枣庄市|