久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Golang中怎么實現一個工作池

148次閱讀
沒有評論

共計 5536 個字符,預計需要花費 14 分鐘才能閱讀完成。

本篇文章為大家展示了 Golang 中怎么實現一個工作池,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

worker pool 簡介

worker pool 其實就是線程池 thread pool。對于 go 來說,直接使用的是 goroutine 而非線程,不過這里仍然以線程來解釋線程池。

在線程池模型中,有 2 個隊列一個池子:任務隊列、已完成任務隊列和線程池。其中已完成任務隊列可能存在也可能不存在,依據實際需求而定。

只要有任務進來,就會放進任務隊列中。只要線程執行完了一個任務,就將任務放進已完成任務隊列,有時候還會將任務的處理結果也放進已完成隊列中。

worker pool 中包含了一堆的線程(worker,對 go 而言每個 worker 就是一個 goroutine),這些線程嗷嗷待哺,等待著為它們分配任務,或者自己去任務隊列中取任務。取得任務后更新任務隊列,然后執行任務,并將執行完成的任務放進已完成隊列。

下圖來自 wiki:

在 Go 中有兩種方式可以實現工作池:傳統的互斥鎖、channel。

傳統互斥鎖機制的工作池

假設 Go 中的任務的定義形式為:

type Task struct {   …}

每次有任務進來時,都將任務放在任務隊列中。

使用傳統的互斥鎖方式實現,任務隊列的定義結構大概如下:

type Queue struct{M sync.Mutex Tasks []Task }

然后在執行任務的函數中加上 Lock()和 Unlock()。例如:

func Worker(queue *Queue) {for { // Lock()和 Unlock()之間的是 critical section queue.M.Lock() // 取出任務 task := queue.Tasks[0] // 更新任務隊列 queue.Tasks = queue.Tasks[1:] queue.M.Unlock() // 在此 goroutine 中執行任務 process(task)    }}

假如在線程池中激活了 100 個 goroutine 來執行 Worker()。Lock()和 Unlock()保證了在同一時間點只能有一個 goroutine 取得任務并隨之更新任務列表,取任務和更新任務隊列都是 critical section 中的代碼,它們是具有原子性。然后這個 goroutine 可以執行自己取得的任務。于此同時,其它 goroutine 可以爭奪互斥鎖,只要爭搶到互斥鎖,就可以取得任務并更新任務列表。當某個 goroutine 執行完 process(task),它將因為 for 循環再次參與互斥鎖的爭搶。

上面只是給出了一點主要的代碼段,要實現完整的線程池,還有很多額外的代碼。

通過互斥鎖,上面的一切操作都是線程安全的。但問題在于加鎖 / 解鎖的機制比較重量級,當 worker(即 goroutine)的數量足夠多,鎖機制的實現將出現瓶頸。

通過 buffered channel 實現工作池

在 Go 中,也能用 buffered channel 實現工作池。

示例代碼很長,所以這里先拆分解釋每一部分,最后給出完整的代碼段。

在下面的示例中,每個 worker 的工作都是計算每個數值的位數相加之和。例如給定一個數值 234,worker 則計算 2 +3+4=9。這里交給 worker 的數值是隨機生成的 [0,999) 范圍內的數值。

這個示例有幾個核心功能需要先解釋,也是通過 channel 實現線程池的一般功能:

創建一個 task buffered channel,并通過 allocate()函數將生成的任務存放到 task buffered channel 中創建一個 goroutine pool,每個 goroutine 監聽 task buffered channel,并從中取出任務 goroutine 執行任務后,將結果寫入到 result buffered channel 中從 result buffered channel 中取出計算結果并輸出

首先,創建 Task 和 Result 兩個結構,并創建它們的通道:

type Task struct {   ID int randnum int} type Result struct {   task    Task    result int} var tasks = make(chan Task, 10) var results = make(chan Result, 10)

這里,每個 Task 都有自己的 ID,以及該任務將要被 worker 計算的隨機數。每個 Result 都包含了 worker 的計算結果 result 以及這個結果對應的 task,這樣從 Result 中就可以取出任務信息以及計算結果。

另外,兩個通道都是 buffered channel,容量都是 10。每個 worker 都會監聽 tasks 通道,并取出其中的任務進行計算,然后將計算結果和任務自身放進 results 通道中。

然后是計算位數之和的函數 process(),它將作為 worker 的工作任務之一。

func process(num int) int {   sum := 0 for num != 0 {        digit := num % 10 sum += digit        num /= 10}    time.Sleep(2 * time.Second) return sum}

這個計算過程其實很簡單,但隨后還睡眠了 2 秒,用來假裝執行一個計算任務是需要一點時間的。

然后是 worker(),它監聽 tasks 通道并取出任務進行計算,并將結果放進 results 通道。

func worker(wg *WaitGroup){defer wg.Done() for task := range tasks {       result := Result{task, process(task.randnum)}        results – result    }}

上面的代碼很容易理解,只要 tasks channel 不關閉,就會一直監聽該 channel。需要注意的是,該函數使用指針類型的 *WaitGroup 作為參數,不能直接使用值類型的 WaitGroup 作為參數,這樣會使得每個 worker 都有一個自己的 WaitGroup。

然后是創建工作池的函數 createWorkerPool(),它有一個數值參數,表示要創建多少個 worker。

func createWorkerPool(numOfWorkers int) {var wg sync.WaitGroup for i := 0; i numOfWorkers; i++ {        wg.Add(1) go worker(wg)    }    wg.Wait() close(results)}

創建工作池時,首先創建一個 WaitGroup 的值 wg,這個 wg 被工作池中的所有 goroutine 共享,每創建一個 goroutine 都 wg.Add(1)。創建完所有的 goroutine 后等待所有的 groutine 都執行完它們的任務,只要有一個任務還沒有執行完,這個函數就會被 Wait()阻塞。當所有任務都執行完成后,關閉 results 通道,因為沒有結果再需要向該通道寫了。

當然,這里是否需要關閉 results 通道,是由稍后的 range 迭代這個通道決定的,不關閉這個通道會一直阻塞 range,最終導致死鎖。

工作池部分已經完成了。現在需要使用 allocate()函數分配任務:生成一大堆的隨機數,然后將 Task 放進 tasks 通道。該函數有一個代表創建任務數量的數值參數:

func allocate(numOfTasks int) {for i := 0; i numOfTasks; i++ {

randnum := rand.Intn(999)        task := Task{i, randnum}        tasks – task    } close(tasks)}

注意,最后需要關閉 tasks 通道,因為所有任務都分配完之后,沒有任務再需要分配。當然,這里之所以需要關閉 tasks 通道,是因為 worker()中使用了 range 迭代 tasks 通道,如果不關閉這個通道,worker 將在取完所有任務后一直阻塞,最終導致死鎖。

再接著的是取出 results 通道中的結果進行輸出,函數名為 getResult():

func getResult(done chan bool) {for result := range results {        fmt.Printf( Task id %d, randnum %d , sum %d\n , result.task.id, result.task.randnum, result.result)    }    done – true }

getResult()中使用了一個 done 參數,這個參數是一個信號通道,用來表示 results 中的所有結果都取出來并處理完成了,這個通道不一定要用 bool 類型,任何類型皆可,它不用來傳數據,僅用來返回可讀,所以上面直接 close(done)的效果也一樣。通過下面的 main()函數,就能理解 done 信號通道的作用。

最后還差 main()函數:

func main() { // 記錄起始終止時間,用來測試完成所有任務耗費時長 startTime := time.Now()        numOfWorkers := 20 numOfTasks := 100 // 創建任務到任務隊列中 go allocate(numOfTasks) // 創建工作池 go createWorkerPool(numOfWorkers) // 取得結果 var done = make(chan bool) go getResult(done) // 如果 results 中還有數據,將阻塞在此 // 直到發送了信號給 done 通道 – done    endTime := time.Now()    diff := endTime.Sub(startTime)    fmt.Println(total time taken , diff.Seconds(), seconds )}

上面分配了 20 個 worker,這 20 個 worker 總共需要處理的任務數量為 100。但注意,無論是 tasks 還是 results 通道,容量都是 10,意味著任務隊列最長只能是 10 個任務。

下面是完整的代碼段:

package main import (fmt math/rand sync time) type Task struct {

id int randnum int } type Result struct {   task   Task    result int} var tasks = make(chan Task, 10) var results = make(chan Result, 10) func process(num int) int {   sum := 0 for num != 0 {        digit := num % 10 sum += digit        num /= 10}    time.Sleep(2 * time.Second) return sum} func worker(wg *sync.WaitGroup) {defer wg.Done() for task := range tasks {       result := Result{task, process(task.randnum)}        results – result    }} func createWorkerPool(numOfWorkers int) {var wg sync.WaitGroup for i := 0; i numOfWorkers; i++ {        wg.Add(1) go worker(wg)    }    wg.Wait() close(results)} func allocate(numOfTasks int) {for i := 0; i numOfTasks; i++ {        randnum := rand.Intn(999)        task := Task{i, randnum}        tasks – task    } close(tasks)} func getResult(done chan bool) {for result := range results {        fmt.Printf( Task id %d, randnum %d , sum %d\n , result.task.id, result.task.randnum, result.result)    }    done – true } func main() {    startTime := time.Now()    numOfWorkers := 20 numOfTasks := 100 var done = make(chan bool) go getResult(done) go allocate(numOfTasks) go createWorkerPool(numOfWorkers) // 必須在 allocate()和 getResult()之后創建工作池 -done    endTime := time.Now()    diff := endTime.Sub(startTime)    fmt.Println(total time taken , diff.Seconds(), seconds )}

執行結果:

Task id 19, randnum 914 , sum 14 Task id 9, randnum 150 , sum 6 Task id 15, randnum 215 , sum 8 …………Task id 97, randnum 315 , sum 9 Task id 99, randnum 641 , sum 11 total time taken 10.0174705 seconds

總共花費 10 秒。

可以試著將任務數量、worker 數量修改修改,看看它們的性能比例情況。例如,將 worker 數量設置為 99,將需要 4 秒,將 worker 數量設置為 10,將需要 20 秒。

上述內容就是 Golang 中怎么實現一個工作池,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注丸趣 TV 行業資訊頻道。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-03發表,共計5536字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 同仁县| 崇明县| 册亨县| 东乌| 尉氏县| 叶城县| 慈利县| 横山县| 城固县| 拉孜县| 普定县| 岑溪市| 咸宁市| 铁岭市| 韩城市| 措美县| 开封县| 元朗区| 茶陵县| 体育| 蓝山县| 黔南| 沭阳县| 武冈市| 白水县| 社会| 砀山县| 云霄县| 华宁县| 临夏市| 汉沽区| 怀安县| 太仓市| 广德县| 义乌市| 西丰县| 雅安市| 无棣县| 徐汇区| 神木县| 东明县|