久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

利用Redis流怎么實現一個消息隊列

135次閱讀
沒有評論

共計 2941 個字符,預計需要花費 8 分鐘才能閱讀完成。

自動寫代碼機器人,免費開通

利用 Redis 流怎么實現一個消息隊列?相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

代碼清單 10-1 展示了一個具有基本功能的消息隊列實現:

代碼最開頭的是幾個轉換函數,它們負責對程序的相關輸入輸出進行轉換和格式化;

MessageQueue 類用于實現消息隊列,它的添加消息、移除消息以及返回消息數量三個方法分別使用了流的  XADD 命令、 XDEL 命令和  XLEN 命令;

消息隊列的兩個獲取方法  get_message() 和  get_by_range() 分別以兩種形式調用了流的  XRANGE 命令;

最后,用于迭代消息的  iterate() 方法使用了  XREAD 命令對流進行迭代。

代碼清單 10-1 使用 Redis 流實現的消息隊列: /stream/message_queue.py

def reconstruct_message_list(message_list):
  
  為了讓多條消息能夠以更結構化的方式返回給調用者,  將  Redis  返回的多條消息從原來的格式: [(id1, {k1:v1, k2:v2, ...}), (id2, {k1:v1, k2:v2, ...}), ...]
  轉換成以下格式: [{id1: {k1:v1, k2:v2, ...}}, {id2: {k1:v1, k2:v2, ...}}, ...]
   result = []
 for id, kvs in message_list:
 result.append({id: kvs})
 return result
def get_message_from_nested_list(lst):
  
  從嵌套列表中取出消息本體。  
 return lst[0][1]
class MessageQueue:
  
  使用  Redis  流實現的消息隊列。  
 def __init__(self, client, stream_key):
 self.client = client
 self.stream = stream_key
 def add_message(self, key_value_pairs):
  
  將給定的鍵值對存入到消息里面,并返回相應的消息  ID 。  
 return self.client.xadd(self.stream, key_value_pairs)
 def get_message(self, message_id):
  
  根據給定的消息  ID  返回相應的消息,如果消息不存在則返回  None 。  
 reply = self.client.xrange(self.stream, message_id, message_id)
 if len(reply) == 1:
 return get_message_from_nested_list(reply)
 def remove_message(self, message_id):
  
  根據給定的消息  ID  刪除相應的消息,如果消息不存在則忽略該動作。  
 self.client.xdel(self.stream, message_id)
 def len(self):
  
  返回消息隊列的長度。  
 return self.client.xlen(self.stream)
 def get_by_range(self, start_id, end_id, max_item=10):
  
  根據給定的  ID  區間范圍返回隊列中的消息。  
 reply = self.client.xrange(self.stream, start_id, end_id, max_item)
 return reconstruct_message_list(reply)
 def iterate(self, start_id=0, max_item=10):
  
  對消息隊列進行迭代,返回最多  N  條大于給定  ID  的消息。  
 reply = self.client.xread({self.stream: start_id}, max_item)
 if len(reply) == 0:
 return list()
 else:
 messages = get_message_from_nested_list(reply)
 return reconstruct_message_list(messages)

對于這個消息隊列實現,我們可以通過執行以下代碼,創建出它的實例:

 from redis import Redis
  from message_queue import MessageQueue
  client = Redis(decode_responses=True)
  mq = MessageQueue(client,  mq)

然后通過執行以下代碼,向隊列里面添加十條消息:

 for i in range(10):
... key =  key{0} .format(i)
... value =  value{0} .format(i)
... msg = {key:value}
... mq.add_message(msg)
 1554113926280-0 
 1554113926280-1 
 1554113926281-0 
 1554113926281-1 
 1554113926281-2 
 1554113926281-3 
 1554113926281-4 
 1554113926281-5 
 1554113926281-6 
 1554113926282-0

還可以根據 ID 獲取指定的消息,又或者使用  get_by_range() 方法同時獲取多條消息:

 mq.get_message(1554113926280-0)
{key0 :  value0}
  mq.get_message(1554113926280-1)
{key1 :  value1}
  mq.get_by_range(- ,  + , 3)
[{1554113926280-0 : { key0 :  value0}}, {1554113926280-1 : { key1 :  value1}}, {1554113926281-0 : { key2 :  value2}}]

又或者使用  iterate() 方法對消息隊列進行迭代,等等:

 mq.iterate(0, 3)
[{1554113926280-0 : { key0 :  value0}}, {1554113926280-1 : { key1 :  value1}}, {1554113926281-0 : { key2 :  value2}}]
  mq.iterate(1554113926281-0 , 3)
[{1554113926281-1 : { key3 :  value3}}, {1554113926281-2 : { key4 :  value4}}, {1554113926281-3 : { key5 :  value5}}]

看完上述內容,你們掌握利用 Redis 流怎么實現一個消息隊列的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注丸趣 TV 行業資訊頻道,感謝各位的閱讀!

向 AI 問一下細節

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-12-04發表,共計2941字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 郧西县| 乌什县| 苏尼特右旗| 定安县| 闻喜县| 阿拉善右旗| 嘉义县| 佛教| 凤山市| 稻城县| 崇信县| 汝南县| 句容市| 离岛区| 吴旗县| 甘德县| 永新县| 樟树市| 盐津县| 乌恰县| 突泉县| 禹州市| 图木舒克市| 威远县| 朔州市| 吐鲁番市| 弋阳县| 宝鸡市| 昌黎县| 仙居县| 会理县| 满洲里市| 宝应县| 星子县| 建德市| 荣成市| 尤溪县| 迁安市| 屯昌县| 通道| 宜都市|