久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

基于環(huán)狀隊(duì)列和迭代器如何實(shí)現(xiàn)分布式任務(wù)RR分配策略

共計(jì) 6353 個(gè)字符,預(yù)計(jì)需要花費(fèi) 16 分鐘才能閱讀完成。

這期內(nèi)容當(dāng)中丸趣 TV 小編將會(huì)給大家?guī)碛嘘P(guān)基于環(huán)狀隊(duì)列和迭代器如何實(shí)現(xiàn)分布式任務(wù) RR 分配策略,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

# 背景

## 分布式任務(wù)分配

在很多運(yùn)維場景下,我們都會(huì)執(zhí)行一些長時(shí)間的任務(wù),比如裝機(jī)、部署環(huán)境、打包鏡像等長時(shí)間任務(wù),而通常我們的任務(wù)節(jié)點(diǎn)數(shù)量通常是有限的 (排除基于 k8s 的 hpa、或者 knative 等自動(dòng)伸縮場景)。

那么當(dāng)我們有一個(gè)任務(wù)如何根據(jù)當(dāng)前的 worker 和 corrdinator 和任務(wù)來進(jìn)行合理的分配,分配其實(shí)也比較復(fù)雜,往復(fù)雜里面做,可以根據(jù)當(dāng)前系統(tǒng)的負(fù)載、每個(gè)任務(wù)的執(zhí)行資源消耗、當(dāng)前集群的任務(wù)數(shù)量等,這里我們就搞一個(gè)最簡單的,基于任務(wù)和當(dāng)前 worker 的 RR 算法

## 系統(tǒng)架構(gòu)

在 worker 和任務(wù)隊(duì)列之間,添加一層協(xié)調(diào)調(diào)度層 Coordinator,由它來根據(jù)當(dāng)前集群任務(wù)的狀態(tài)來進(jìn)行任務(wù)的分配,同時(shí)感知當(dāng)前集群 worker 和 task 的狀態(tài),協(xié)調(diào)整個(gè)集群任務(wù)的執(zhí)行、終止等操作

# 單機(jī)實(shí)現(xiàn)

## 整體設(shè)計(jì)

members: 表示當(dāng)前集群中所有的 worker

tasks: 就是當(dāng)前的任務(wù)

Coordinator: 就是我們的協(xié)調(diào)者,負(fù)責(zé)根據(jù) members 和 tasks 進(jìn)行任務(wù)的分配

result: 就是分配的結(jié)果

## CircularIterator

CircularIterator 就是我們的環(huán)狀對(duì)立迭代器, 擁有兩個(gè)方法,一個(gè)是 add 添加 member, 一個(gè) Next 返回基于 rr 的下一個(gè) member

“`go

// CircularIterator 環(huán)狀迭代器

type CircularIterator struct {

list []interface{}    // 保存所有的成員變量

next int

}

// Next 返回下一個(gè)元素

func (c *CircularIterator) Next() interface{} {

item := c.list[c.next]

c.next = (c.next + 1) % len(c.list)

return item

}

// Add 添加任務(wù)

func (c *CircularIterator) Add(v interface{}) bool {

for _, item := range c.list {

if v == item {

return false

}

}

c.list = append(c.list, v)

return true

}

“`

## Member Task

Member 就是負(fù)責(zé)執(zhí)行任務(wù)的 worker, 有一個(gè) AddTask 方法和 Execute 方法負(fù)責(zé)任務(wù)的執(zhí)行和添加任務(wù)

Task 標(biāo)識(shí)一個(gè)任務(wù)

“`go

// Member 任務(wù)組成員

type Member struct {

id    int

tasks []*Task

}

// ID 返回當(dāng)前 memberID

func (m *Member) ID() int {

return m.id

}

// AddTask 為 member 添加任務(wù)

func (m *Member) AddTask(t *Task) bool {

for _, task := range m.tasks {

if task == t {

return false

}

}

m.tasks = append(m.tasks, t)

return true

}

// Execute 執(zhí)行任務(wù)

func (m *Member) Execute() {

for _, task := range m.tasks {

fmt.Printf(Member %d run task %s\n , m.ID(), task.Execute())

}

}

// Task 任務(wù)

type Task struct {

name string

}

// Execute 執(zhí)行 task 返回結(jié)果

func (t *Task) Execute() string {

return Task + t.name + run success

}

“`

## Coordinator

Coordinator 是協(xié)調(diào)器,負(fù)責(zé)根據(jù) Member 和 task 進(jìn)行集群任務(wù)的協(xié)調(diào)調(diào)度

“`go

// Task 任務(wù)

type Task struct {

name string

}

// Execute 執(zhí)行 task 返回結(jié)果

func (t *Task) Execute() string {

return Task + t.name + run success

}

// Coordinator 協(xié)調(diào)者

type Coordinator struct {

members []*Member

tasks   []*Task

}

// TaskAssignments 為 member 分配任務(wù)

func (c *Coordinator) TaskAssignments() map[int]*Member {

taskAssignments := make(map[int]*Member)

// 構(gòu)建迭代器

memberIt := c.getMemberIterator()

for _, task := range c.tasks {

member := memberIt.Next().(*Member)

_, err := taskAssignments[member.ID()]

if err == false {

taskAssignments[member.ID()] = member

}

member.AddTask(task)

}

return taskAssignments

}

func (c *Coordinator) getMemberIterator() *CircularIterator {

// 通過當(dāng)前成員, 構(gòu)造成員隊(duì)列

members := make([]interface{}, len(c.members))

for index, member := range c.members {

members[index] = member

}

return NewCircularIterftor(members)

}

// AddMember 添加 member 組成員

func (c *Coordinator) AddMember(m *Member) bool {

for _, member := range c.members {

if member == m {

return false

}

}

c.members = append(c.members, m)

return true

}

// AddTask 添加任務(wù)

func (c *Coordinator) AddTask(t *Task) bool {

for _, task := range c.tasks {

if task == t {

return false

}

}

c.tasks = append(c.tasks, t)

return true

}

“`

## 測(cè)試

我們首先創(chuàng)建一堆 member 和 task, 然后調(diào)用 coordinator 進(jìn)行任務(wù)分配,執(zhí)行任務(wù)結(jié)果

“`go

coordinator := NewCoordinator()

for i := 0; i i++ {

m := Member{id: i}

coordinator.AddMember(m)

}

for i := 0; i i++ {

t := Task{name: fmt.Sprintf( task %d , i)}

coordinator.AddTask(t)

}

result := coordinator.TaskAssignments()

for _, member := range result {

member.Execute()

}

“`

## 結(jié)果

可以看到每個(gè) worker 均勻的得到任務(wù)分配

“`bash

Member 6 run task Task task 6 run success

Member 6 run task Task task 16 run success

Member 6 run task Task task 26 run success

Member 8 run task Task task 8 run success

Member 8 run task Task task 18 run success

Member 8 run task Task task 28 run success

Member 0 run task Task task 0 run success

Member 0 run task Task task 10 run success

Member 0 run task Task task 20 run success

Member 3 run task Task task 3 run success

Member 3 run task Task task 13 run success

Member 3 run task Task task 23 run success

Member 4 run task Task task 4 run success

Member 4 run task Task task 14 run success

Member 4 run task Task task 24 run success

Member 7 run task Task task 7 run success

Member 7 run task Task task 17 run success

Member 7 run task Task task 27 run success

Member 9 run task Task task 9 run success

Member 9 run task Task task 19 run success

Member 9 run task Task task 29 run success

Member 1 run task Task task 1 run success

Member 1 run task Task task 11 run success

Member 1 run task Task task 21 run success

Member 2 run task Task task 2 run success

Member 2 run task Task task 12 run success

Member 2 run task Task task 22 run success

Member 5 run task Task task 5 run success

Member 5 run task Task task 15 run success

Member 5 run task Task task 25 run success

“`

## 完整代碼

“`go

package main

import fmt

// CircularIterator 環(huán)狀迭代器

type CircularIterator struct {

list []interface{}

next int

}

// Next 返回下一個(gè)元素

func (c *CircularIterator) Next() interface{} {

item := c.list[c.next]

c.next = (c.next + 1) % len(c.list)

return item

}

// Add 添加任務(wù)

func (c *CircularIterator) Add(v interface{}) bool {

for _, item := range c.list {

if v == item {

return false

}

}

c.list = append(c.list, v)

return true

}

// Member 任務(wù)組成員

type Member struct {

id    int

tasks []*Task

}

// ID 返回當(dāng)前 memberID

func (m *Member) ID() int {

return m.id

}

// AddTask 為 member 添加任務(wù)

func (m *Member) AddTask(t *Task) bool {

for _, task := range m.tasks {

if task == t {

return false

}

}

m.tasks = append(m.tasks, t)

return true

}

// Execute 執(zhí)行任務(wù)

func (m *Member) Execute() {

for _, task := range m.tasks {

fmt.Printf(Member %d run task %s\n , m.ID(), task.Execute())

}

}

// Task 任務(wù)

type Task struct {

name string

}

// Execute 執(zhí)行 task 返回結(jié)果

func (t *Task) Execute() string {

return Task + t.name + run success

}

// Coordinator 協(xié)調(diào)者

type Coordinator struct {

members []*Member

tasks   []*Task

}

// TaskAssignments 為 member 分配任務(wù)

func (c *Coordinator) TaskAssignments() map[int]*Member {

taskAssignments := make(map[int]*Member)

// 構(gòu)建迭代器

memberIt := c.getMemberIterator()

for _, task := range c.tasks {

member := memberIt.Next().(*Member)

_, err := taskAssignments[member.ID()]

if err == false {

taskAssignments[member.ID()] = member

}

member.AddTask(task)

}

return taskAssignments

}

func (c *Coordinator) getMemberIterator() *CircularIterator {

// 通過當(dāng)前成員, 構(gòu)造成員隊(duì)列

members := make([]interface{}, len(c.members))

for index, member := range c.members {

members[index] = member

}

return NewCircularIterftor(members)

}

// AddMember 添加 member 組成員

func (c *Coordinator) AddMember(m *Member) bool {

for _, member := range c.members {

if member == m {

return false

}

}

c.members = append(c.members, m)

return true

}

// AddTask 添加任務(wù)

func (c *Coordinator) AddTask(t *Task) bool {

for _, task := range c.tasks {

if task == t {

return false

}

}

c.tasks = append(c.tasks, t)

return true

}

// NewCircularIterftor 返回迭代器

func NewCircularIterftor(list []interface{}) *CircularIterator {

iterator := CircularIterator{}

for _, item := range list {

iterator.Add(item)

}

return iterator

}

// NewCoordinator 返回協(xié)調(diào)器

func NewCoordinator() *Coordinator {

c := Coordinator{}

return c

}

func main() {

coordinator := NewCoordinator()

for i := 0; i i++ {

m := Member{id: i}

coordinator.AddMember(m)

}

for i := 0; i i++ {

t := Task{name: fmt.Sprintf( task %d , i)}

coordinator.AddTask(t)

}

result := coordinator.TaskAssignments()

for _, member := range result {

member.Execute()

}

}

“`

任務(wù)協(xié)調(diào)是一個(gè)非常復(fù)雜的事情,內(nèi)部的任務(wù)平臺(tái),雖然實(shí)現(xiàn)了基于任務(wù)的組合和 app 化,但是任務(wù)調(diào)度分配著一塊,仍然沒有去做,只是簡單的根據(jù)樹形任務(wù)去簡單的做一些分支任務(wù)的執(zhí)行,未來有時(shí)間再做吧,要繼續(xù)研究下一個(gè)模塊了。

上述就是丸趣 TV 小編為大家分享的基于環(huán)狀隊(duì)列和迭代器如何實(shí)現(xiàn)分布式任務(wù) RR 分配策略了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注丸趣 TV 行業(yè)資訊頻道。

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-17發(fā)表,共計(jì)6353字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請(qǐng)注明出處。
評(píng)論(沒有評(píng)論)
主站蜘蛛池模板: 怀集县| 巩义市| 泸州市| 长寿区| 英山县| 楚雄市| 铜陵市| 乌审旗| 重庆市| 赤水市| 苍溪县| 两当县| 那坡县| 阜平县| 崇信县| 寻甸| 延寿县| 永嘉县| 崇礼县| 江津市| 浮梁县| 平塘县| 绥棱县| 景德镇市| 家居| 富蕴县| 岢岚县| 应城市| 英超| 瑞安市| 屯留县| 阿勒泰市| 耿马| 新野县| 涡阳县| 楚雄市| 龙南县| 正安县| 交口县| 如东县| 富宁县|