久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

基于環狀隊列和迭代器如何實現分布式任務RR分配策略

164次閱讀
沒有評論

共計 6353 個字符,預計需要花費 16 分鐘才能閱讀完成。

這期內容當中丸趣 TV 小編將會給大家帶來有關基于環狀隊列和迭代器如何實現分布式任務 RR 分配策略,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

# 背景

## 分布式任務分配

在很多運維場景下,我們都會執行一些長時間的任務,比如裝機、部署環境、打包鏡像等長時間任務,而通常我們的任務節點數量通常是有限的 (排除基于 k8s 的 hpa、或者 knative 等自動伸縮場景)。

那么當我們有一個任務如何根據當前的 worker 和 corrdinator 和任務來進行合理的分配,分配其實也比較復雜,往復雜里面做,可以根據當前系統的負載、每個任務的執行資源消耗、當前集群的任務數量等,這里我們就搞一個最簡單的,基于任務和當前 worker 的 RR 算法

## 系統架構

在 worker 和任務隊列之間,添加一層協調調度層 Coordinator,由它來根據當前集群任務的狀態來進行任務的分配,同時感知當前集群 worker 和 task 的狀態,協調整個集群任務的執行、終止等操作

# 單機實現

## 整體設計

members: 表示當前集群中所有的 worker

tasks: 就是當前的任務

Coordinator: 就是我們的協調者,負責根據 members 和 tasks 進行任務的分配

result: 就是分配的結果

## CircularIterator

CircularIterator 就是我們的環狀對立迭代器, 擁有兩個方法,一個是 add 添加 member, 一個 Next 返回基于 rr 的下一個 member

“`go

// CircularIterator 環狀迭代器

type CircularIterator struct {

list []interface{}    // 保存所有的成員變量

next int

}

// Next 返回下一個元素

func (c *CircularIterator) Next() interface{} {

item := c.list[c.next]

c.next = (c.next + 1) % len(c.list)

return item

}

// Add 添加任務

func (c *CircularIterator) Add(v interface{}) bool {

for _, item := range c.list {

if v == item {

return false

}

}

c.list = append(c.list, v)

return true

}

“`

## Member Task

Member 就是負責執行任務的 worker, 有一個 AddTask 方法和 Execute 方法負責任務的執行和添加任務

Task 標識一個任務

“`go

// Member 任務組成員

type Member struct {

id    int

tasks []*Task

}

// ID 返回當前 memberID

func (m *Member) ID() int {

return m.id

}

// AddTask 為 member 添加任務

func (m *Member) AddTask(t *Task) bool {

for _, task := range m.tasks {

if task == t {

return false

}

}

m.tasks = append(m.tasks, t)

return true

}

// Execute 執行任務

func (m *Member) Execute() {

for _, task := range m.tasks {

fmt.Printf(Member %d run task %s\n , m.ID(), task.Execute())

}

}

// Task 任務

type Task struct {

name string

}

// Execute 執行 task 返回結果

func (t *Task) Execute() string {

return Task + t.name + run success

}

“`

## Coordinator

Coordinator 是協調器,負責根據 Member 和 task 進行集群任務的協調調度

“`go

// Task 任務

type Task struct {

name string

}

// Execute 執行 task 返回結果

func (t *Task) Execute() string {

return Task + t.name + run success

}

// Coordinator 協調者

type Coordinator struct {

members []*Member

tasks   []*Task

}

// TaskAssignments 為 member 分配任務

func (c *Coordinator) TaskAssignments() map[int]*Member {

taskAssignments := make(map[int]*Member)

// 構建迭代器

memberIt := c.getMemberIterator()

for _, task := range c.tasks {

member := memberIt.Next().(*Member)

_, err := taskAssignments[member.ID()]

if err == false {

taskAssignments[member.ID()] = member

}

member.AddTask(task)

}

return taskAssignments

}

func (c *Coordinator) getMemberIterator() *CircularIterator {

// 通過當前成員, 構造成員隊列

members := make([]interface{}, len(c.members))

for index, member := range c.members {

members[index] = member

}

return NewCircularIterftor(members)

}

// AddMember 添加 member 組成員

func (c *Coordinator) AddMember(m *Member) bool {

for _, member := range c.members {

if member == m {

return false

}

}

c.members = append(c.members, m)

return true

}

// AddTask 添加任務

func (c *Coordinator) AddTask(t *Task) bool {

for _, task := range c.tasks {

if task == t {

return false

}

}

c.tasks = append(c.tasks, t)

return true

}

“`

## 測試

我們首先創建一堆 member 和 task, 然后調用 coordinator 進行任務分配,執行任務結果

“`go

coordinator := NewCoordinator()

for i := 0; i i++ {

m := Member{id: i}

coordinator.AddMember(m)

}

for i := 0; i i++ {

t := Task{name: fmt.Sprintf( task %d , i)}

coordinator.AddTask(t)

}

result := coordinator.TaskAssignments()

for _, member := range result {

member.Execute()

}

“`

## 結果

可以看到每個 worker 均勻的得到任務分配

“`bash

Member 6 run task Task task 6 run success

Member 6 run task Task task 16 run success

Member 6 run task Task task 26 run success

Member 8 run task Task task 8 run success

Member 8 run task Task task 18 run success

Member 8 run task Task task 28 run success

Member 0 run task Task task 0 run success

Member 0 run task Task task 10 run success

Member 0 run task Task task 20 run success

Member 3 run task Task task 3 run success

Member 3 run task Task task 13 run success

Member 3 run task Task task 23 run success

Member 4 run task Task task 4 run success

Member 4 run task Task task 14 run success

Member 4 run task Task task 24 run success

Member 7 run task Task task 7 run success

Member 7 run task Task task 17 run success

Member 7 run task Task task 27 run success

Member 9 run task Task task 9 run success

Member 9 run task Task task 19 run success

Member 9 run task Task task 29 run success

Member 1 run task Task task 1 run success

Member 1 run task Task task 11 run success

Member 1 run task Task task 21 run success

Member 2 run task Task task 2 run success

Member 2 run task Task task 12 run success

Member 2 run task Task task 22 run success

Member 5 run task Task task 5 run success

Member 5 run task Task task 15 run success

Member 5 run task Task task 25 run success

“`

## 完整代碼

“`go

package main

import fmt

// CircularIterator 環狀迭代器

type CircularIterator struct {

list []interface{}

next int

}

// Next 返回下一個元素

func (c *CircularIterator) Next() interface{} {

item := c.list[c.next]

c.next = (c.next + 1) % len(c.list)

return item

}

// Add 添加任務

func (c *CircularIterator) Add(v interface{}) bool {

for _, item := range c.list {

if v == item {

return false

}

}

c.list = append(c.list, v)

return true

}

// Member 任務組成員

type Member struct {

id    int

tasks []*Task

}

// ID 返回當前 memberID

func (m *Member) ID() int {

return m.id

}

// AddTask 為 member 添加任務

func (m *Member) AddTask(t *Task) bool {

for _, task := range m.tasks {

if task == t {

return false

}

}

m.tasks = append(m.tasks, t)

return true

}

// Execute 執行任務

func (m *Member) Execute() {

for _, task := range m.tasks {

fmt.Printf(Member %d run task %s\n , m.ID(), task.Execute())

}

}

// Task 任務

type Task struct {

name string

}

// Execute 執行 task 返回結果

func (t *Task) Execute() string {

return Task + t.name + run success

}

// Coordinator 協調者

type Coordinator struct {

members []*Member

tasks   []*Task

}

// TaskAssignments 為 member 分配任務

func (c *Coordinator) TaskAssignments() map[int]*Member {

taskAssignments := make(map[int]*Member)

// 構建迭代器

memberIt := c.getMemberIterator()

for _, task := range c.tasks {

member := memberIt.Next().(*Member)

_, err := taskAssignments[member.ID()]

if err == false {

taskAssignments[member.ID()] = member

}

member.AddTask(task)

}

return taskAssignments

}

func (c *Coordinator) getMemberIterator() *CircularIterator {

// 通過當前成員, 構造成員隊列

members := make([]interface{}, len(c.members))

for index, member := range c.members {

members[index] = member

}

return NewCircularIterftor(members)

}

// AddMember 添加 member 組成員

func (c *Coordinator) AddMember(m *Member) bool {

for _, member := range c.members {

if member == m {

return false

}

}

c.members = append(c.members, m)

return true

}

// AddTask 添加任務

func (c *Coordinator) AddTask(t *Task) bool {

for _, task := range c.tasks {

if task == t {

return false

}

}

c.tasks = append(c.tasks, t)

return true

}

// NewCircularIterftor 返回迭代器

func NewCircularIterftor(list []interface{}) *CircularIterator {

iterator := CircularIterator{}

for _, item := range list {

iterator.Add(item)

}

return iterator

}

// NewCoordinator 返回協調器

func NewCoordinator() *Coordinator {

c := Coordinator{}

return c

}

func main() {

coordinator := NewCoordinator()

for i := 0; i i++ {

m := Member{id: i}

coordinator.AddMember(m)

}

for i := 0; i i++ {

t := Task{name: fmt.Sprintf( task %d , i)}

coordinator.AddTask(t)

}

result := coordinator.TaskAssignments()

for _, member := range result {

member.Execute()

}

}

“`

任務協調是一個非常復雜的事情,內部的任務平臺,雖然實現了基于任務的組合和 app 化,但是任務調度分配著一塊,仍然沒有去做,只是簡單的根據樹形任務去簡單的做一些分支任務的執行,未來有時間再做吧,要繼續研究下一個模塊了。

上述就是丸趣 TV 小編為大家分享的基于環狀隊列和迭代器如何實現分布式任務 RR 分配策略了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注丸趣 TV 行業資訊頻道。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-25發表,共計6353字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 新民市| 神农架林区| 舞钢市| 桂阳县| 黎平县| 西乡县| 青川县| 隆昌县| 长汀县| 哈密市| 枣庄市| 尼玛县| 奉贤区| 鹤山市| 邮箱| 垦利县| 稻城县| 荃湾区| 望都县| 九台市| 栾城县| 呼玛县| 通化县| 竹山县| 莱州市| 保德县| 冀州市| 堆龙德庆县| 青龙| 荣昌县| 清镇市| 长海县| 内江市| 遂昌县| 阿图什市| 岳阳县| 海原县| 江西省| 德兴市| 仁怀市| 微博|