共計(jì) 895 個(gè)字符,預(yù)計(jì)需要花費(fèi) 3 分鐘才能閱讀完成。
在 Go 語(yǔ)言中,可以使用 WaitGroup 來(lái)實(shí)現(xiàn)高并發(fā)的數(shù)據(jù)處理流水線。
WaitGroup 是一個(gè)計(jì)數(shù)信號(hào)量,用于等待一組并發(fā)操作完成。通過(guò) Add 方法可以增加計(jì)數(shù)器的值,Done 方法可以減少計(jì)數(shù)器的值,Wait 方法可以阻塞直到計(jì)數(shù)器變?yōu)?0。
下面是一個(gè)使用 WaitGroup 的高并發(fā)數(shù)據(jù)處理流水線的示例:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
numWorkers := 10
dataChan := make(chan int, 100)
// 第一階段,生成數(shù)據(jù)
wg.Add(1)
go func() {defer wg.Done()
for i := 0; i < 100; i++ {dataChan <- i}
close(dataChan)
}()
// 第二階段,處理數(shù)據(jù)
for i := 0; i < numWorkers; i++ {wg.Add(1)
go func() {defer wg.Done()
for data := range dataChan {result := processData(data)
fmt.Println(result)
}
}()}
wg.Wait()}
func processData(data int) int {
// 此處模擬數(shù)據(jù)處理過(guò)程
return data * 2
}
在這個(gè)示例中,首先創(chuàng)建了一個(gè) WaitGroup 對(duì)象 wg 和一個(gè)緩沖通道 dataChan,用于數(shù)據(jù)在各個(gè)階段之間的傳遞。
然后,在第一階段中,啟動(dòng)一個(gè) goroutine 來(lái)生成數(shù)據(jù),并向 dataChan 通道中發(fā)送數(shù)據(jù)。發(fā)送完數(shù)據(jù)后,通過(guò)調(diào)用 close(dataChan) 來(lái)關(guān)閉通道。
在第二階段中,通過(guò)循環(huán)啟動(dòng)多個(gè) goroutine 來(lái)處理數(shù)據(jù)。每個(gè) goroutine 從 dataChan 中接收數(shù)據(jù),然后調(diào)用 processData 函數(shù)來(lái)處理數(shù)據(jù),并打印處理結(jié)果。
最后,通過(guò)調(diào)用 wg.Wait() 來(lái)等待所有 goroutine 完成。
這樣,就可以實(shí)現(xiàn)一個(gè)高并發(fā)的數(shù)據(jù)處理流水線。在數(shù)據(jù)生成階段和數(shù)據(jù)處理階段之間使用通道進(jìn)行數(shù)據(jù)傳遞,通過(guò) WaitGroup 來(lái)等待所有 goroutine 完成。
丸趣 TV 網(wǎng) – 提供最優(yōu)質(zhì)的資源集合!