激情六月丁香婷婷|亚洲色图AV二区|丝袜AV日韩AV|久草视频在线分类|伊人九九精品视频|国产精品一级电影|久草视频在线99|在线看的av网址|伊人99精品无码|午夜无码视频在线

高校合作1:010-59833514 ?咨詢電話:400-810-1418 服務(wù)與監(jiān)督電話:400-810-1418轉(zhuǎn)接2

golang 游戲開發(fā)(?超強指南!Golang 并發(fā)編程)

發(fā)布時間:2023-11-27 06:50:48 瀏覽量:114次

??超強指南!Golang 并發(fā)編程

golang 游戲開發(fā)(?超強指南!Golang 并發(fā)編程)

作者:dcguo,騰訊 CSIG 電子簽開放平臺中心

分享 Golang 并發(fā)基礎(chǔ)庫,擴展以及三方庫的一些常見問題、使用介紹和技巧,以及對一些并發(fā)庫的選擇和優(yōu)化探討。

提倡的原則

不要通過共享內(nèi)存進行通信;相反,通過通信來共享內(nèi)存。

Goroutine

goroutine 并發(fā)模型

調(diào)度器主要結(jié)構(gòu)

主要調(diào)度器結(jié)構(gòu)是 M,P,G

  1. M,內(nèi)核級別線程,goroutine 基于 M 之上,代表執(zhí)行者,底層線程,物理線程
  2. P,處理器,用來執(zhí)行 goroutine,因此維護了一個 goroutine 隊列,里面存儲了所有要執(zhí)行的 goroutine,將等待執(zhí)行的 G 與 M 對接,它的數(shù)目也代表了真正的并發(fā)度( 即有多少個 goroutine 可以同時進行 );
  3. G,goroutine 實現(xiàn)的核心結(jié)構(gòu),相當于輕量級線程,里面包含了 goroutine 需要的棧,程序計數(shù)器,以及所在 M 的信息

P 的數(shù)量由環(huán)境變量中的 GOMAXPROCS 決定,通常來說和核心數(shù)對應(yīng)。

用戶空間線程和內(nèi)核空間線程映射關(guān)系有如下三種:

  1. N:1
  2. 1:1
  3. M:N

關(guān)系如圖,灰色的 G 則是暫時還未運行的,處于就緒態(tài),等待被調(diào)度,這個隊列被 P 維護

注: 簡單調(diào)度圖如上,有關(guān)于 P 再多個 M 中切換,公共 goroutine 隊列,M 從線程緩存中創(chuàng)建等步驟沒有體現(xiàn),復(fù)雜過程可以參考文章簡單了解 goroutine 如何實現(xiàn)。

  • demo1
go list.Sort()
  • demo2
func Announce(message string, delay time.Duration) {
go func() {
time.Sleep(delay)
fmt.println(message)
}()
}

channel 特性

創(chuàng)建

// 創(chuàng)建 channel
a := make(chan int)
b := make(chan int, 10)
// 單向 channel
c := make(chan<- int)
d := make(<-chan int)

tip:

v, ok := <-a  // 檢查是否成功關(guān)閉(ok = false:已關(guān)閉)
  • use channel
ci := make(chan int)
cj := make(chan int, 0)
cs := make(chan *os.File, 100)
c := make(chan int)
go func() {
list.Sort()
c <- 1
}()
doSomethingForValue
<- c

func Server(queue chan *Request) {
for req := range queue {
sem <- 1
go func() {
process(req)
<- sem
}()
}
}

func Server(queue chan *Requet) {
for req := range queue {
sem <- 1
go func(req *Request) {
process(req)
<- sem
}(req)
}
}
func Serve(queue chan *Request) {
for req := range queue {
req := req
sem <- 1
go func() {
process(req)
<-sem
}()
}
}
c := make(chan bool)
go func() {
    // close 的 channel 會讀到一個零值
    close(c)
}()
<-c

開源項目【是一個支持集群的 im 及實時推送服務(wù)】里面的基準測試的案例

func main() {
 ret := make(chan string, 3)
 for i := 0; i < cap(ret); i++ {
  go call(ret)
 }
    fmt.Println(<-ret)
}
func call(ret chan<- string) {
 // do something
 // ...
 ret <- "result"
}

注: 協(xié)同多個 goroutines 方案很多,這里只展示 channel 的一種。

limits := make(chan struct{}, 2)
for i := 0; i < 10; i++ {
 go func() {
        // 緩沖區(qū)滿了就會阻塞在這
  limits <- struct{}{}
  do()
  <-limits
 }()
}
for {
 select {
    case a := <- testChanA:
        // todo a
    case b, ok := testChanB:
        // todo b, 通過 ok 判斷 tesChanB 的關(guān)閉情況
    default:
        // 默認分支
    }
}
func worker(testChan chan bool) {
    for {
     select {
        // todo some
  // case ...
        case <- testChan:
         testChan <- true
         return
     }
 }
}

func main() {
    testChan := make(chan bool)
    go worker(testChan)
    testChan <- true
    <- testChan
}
testChan := make(chan bool)
close(testChan)

zeroValue := <- testChan
fmt.Println(zeroValue) // false

testChan <- true // panic: send on closed channel

注: 如果是 buffered channel, 即使被 close, 也可以讀到之前存入的值,讀取完畢后開始讀零值,寫入則會觸發(fā) panic

for range
c := make(chan int, 20)
go func() {
 for i := 0; i < 10; i++ {
  c <- i
 }
 close(c)
}()
// 當 c 被關(guān)閉后,取完里面的元素就會跳出循環(huán)
for x := range c {
 fmt.Println(x)
}
func newUniqueIdService() <-chan string {
 id := make(chan string)
 go func() {
  var counter int64 = 0
  for {
   id <- fmt.Sprintf("%x", counter)
   counter += 1
  }
 }()
 return id
}
func newUniqueIdServerMain()  {
 id := newUniqueIdService()
 for i := 0; i < 10; i++ {
  fmt.Println(<- id)
 }
}

  1. 超時控制
func main() {
done := do()
select {
case <-done:
// logic
case <-time.After(3 * time.Second):
// timeout
}
}

demo

開源 im/goim 項目中的應(yīng)用


2.心跳

done := make(chan bool)
defer func() {
close(done)
}()
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-done:
ticker.Stop()
return
case <-ticker.C:
message.Touch()
}
}
}()
}
func main() {
 c := make(chan struct{})
 for i := 0; i < 5; i++ {
  go do(c)
 }
 close(c)
}
func do(c <-chan struct{}) {
    // 會阻塞直到收到 close
 <-c
 fmt.Println("hello")
}
func channel() {
    count := 10 // 最大并發(fā)
 sum := 100  // 總數(shù)

    c := make(chan struct{}, count)
    sc := make(chan struct{}, sum)
    defer close(c)
    defer close(sc)

    for i:=0; i<sum; i++ {
        c <- struct{}
        go func(j int) {
            fmt.Println(j)
            <- c // 執(zhí)行完畢,釋放資源
            sc <- struct {}{} // 記錄到執(zhí)行總數(shù)
        }
    }

    for i:=sum; i>0; i++ {
        <- sc
    }
}

這塊東西為什么放到 channel 之后,因為這里包含了一些低級庫,實際業(yè)務(wù)代碼中除了 context 之外用到都較少(比如一些鎖 mutex,或者一些原子庫 atomic),實際并發(fā)編程代碼中可以用 channel 就用 channel,這也是 go 一直比較推崇得做法 Share memory by communicating; don’t communicate by sharing memory

鎖,使用簡單,保護臨界區(qū)數(shù)據(jù)

使用的時候注意鎖粒度,每次加鎖后都要記得解鎖

Mutex demo

package main

import (
 "fmt"
 "sync"
 "time"
)

func main() {
 var mutex sync.Mutex
 wait := sync.WaitGroup{}

 now := time.Now()
 for i := 1; i <= 3; i++ {
  wait.Add(1)
  go func(i int) {
   mutex.Lock()
   time.Sleep(time.Second)
   mutex.Unlock()
   defer wait.Done()
  }(i)
 }
 wait.Wait()
 duration := time.Since(now)
 fmt.Print(duration)
}

結(jié)果: 可以看到整個執(zhí)行持續(xù)了 3 s 多,內(nèi)部多個協(xié)程已經(jīng)被 “鎖” 住了。

RWMutex demo

注意: 這東西可以并發(fā)讀,不可以并發(fā)讀寫/并發(fā)寫寫,不過現(xiàn)在即便場景是讀多寫少也很少用到這,一般集群環(huán)境都得分布式鎖了。

package main

import (
 "fmt"
 "sync"
 "time"
)

var m *sync.RWMutex

func init() {
 m = new(sync.RWMutex)
}

func main() {
 go read()
 go read()
 go write()

 time.Sleep(time.Second * 3)
}

func read()  {
 m.RLock()
 fmt.Println("startR")
 time.Sleep(time.Second)
 fmt.Println("endR")
 m.RUnlock()
}
func write()  {
 m.Lock()
 fmt.Println("startW")
 time.Sleep(time.Second)
 fmt.Println("endW")
 m.Unlock()
}

輸出:

可以對簡單類型進行原子操作

int32

int64

uint32

uint64

uintptr

unsafe.Pointer

可以進行得原子操作如下

增/減

比較并且交換

假定被操作的值未曾被改變, 并一旦確定這個假設(shè)的真實性就立即進行值替換

載入

為了原子的讀取某個值(防止寫操作未完成就發(fā)生了一個讀操作)

存儲

原子的值存儲函數(shù)

交換

原子交換

demo:增


package main

import (
 "fmt"
 "sync"
 "sync/atomic"
)

func main() {
 var sum uint64

 var wg sync.WaitGroup

 for i := 0; i < 100; i++ {
  wg.Add(1)
  go func() {
   for c := 0; c < 100; c++ {
    atomic.AddUint64(&sum, 1)
   }
   defer wg.Done()
  }()
 }

 wg.Wait()
 fmt.Println(sum)
}

結(jié)果:

waitGroup 是一個 waitGroup 對象可以等待一組 goroutinue 結(jié)束,但是他對錯誤傳遞,goroutinue 出錯時不再等待其他 goroutinue(減少資源浪費) 都不能很好的解決,那么 errGroup 可以解決這部分問題

注意

  • errGroup 中如果多個 goroutinue 錯誤,只會獲取第一個出錯的 goroutinue 的錯誤信息,后面的則不會被感知到;
  • errGroup 里面沒有做 panic 處理,代碼要保持健壯

demo: errGroup

golang 游戲開發(fā)(?超強指南!Golang 并發(fā)編程)

package main

import (
 "golang.org/x/sync/errgroup"
 "log"
 "net/http"
)

func main() {
 var g errgroup.Group
 var urls = []string{
  "https://github.com/",
  "errUrl",
 }
 for _, url := range urls {
  url := url
  g.Go(func() error {
   resp, err := http.Get(url)
   if err == nil {
    _ = resp.Body.Close()
   }
   return err
  })
 }
 err := g.Wait()
 if err != nil {
  log.Fatal("getErr", err)
  return
 }
}

結(jié)果:

保證了傳入的函數(shù)只會執(zhí)行一次,這常用在單例模式,配置文件加載,初始化這些場景下。

demo:

times := 10
 var (
  o  sync.Once
  wg sync.WaitGroup
 )
 wg.Add(times)
 for i := 0; i < times; i++ {
  go func(i int) {
   defer wg.Done()
   o.Do(func() {
    fmt.Println(i)
   })
  }(i)
 }
 wg.Wait()

結(jié)果:

go 開發(fā)已經(jīng)對他了解了太多

可以再多個 goroutinue 設(shè)置截止日期,同步信號,傳遞相關(guān)請求值

對他的說明文章太多了,詳細可以跳轉(zhuǎn)看這篇 一文理解 golang context

這邊列一個遇到得問題:

  • grpc 多服務(wù)調(diào)用,級聯(lián) cancelA -> B -> CA 調(diào)用 B,B 調(diào)用 C,當 A 不依賴 B 請求 C 得結(jié)果時,B 請求 C 之后直接返回 A,那么 A,B 間 context 被 cancel,而 C 得 context 也是繼承于前面,C 請求直接掛掉,只需要重新搞個 context 向下傳就好,記得帶上 reqId, logId 等必要信息。
  • 某些計算可以再 CPU 之間并行化,如果計算可以被劃分為不同的可獨立執(zhí)行的部分,那么他就是可并行化的,任務(wù)可以通過一個 channel 發(fā)送結(jié)束信號。假如我們可以再數(shù)組上進行一個比較耗時的操作,操作的值在每個數(shù)據(jù)上獨立,如下:type vector []float64

    func (v vector) DoSome(i, n int, u Vector, c chan int) {
    for ; i < n; i ++ {
    v[i] += u.Op(v[i])
    }
    c <-
    1
    }

我們可以再每個 CPU 上進行循環(huán)無關(guān)的迭代計算,我們僅需要創(chuàng)建完所有的 goroutine 后,從 channel 中讀取結(jié)束信號進行計數(shù)即可。

這部分如需自己開發(fā),內(nèi)容其實可以分為兩部分能力去做

并發(fā)編程增強方案

工作流解決方案

需要去解決一些基礎(chǔ)問題

并發(fā)編程:

啟動 goroutine 時,增加防止程序 panic 能力

去封裝一些更簡單的錯誤處理方案,比如支持多個錯誤返回

限定任務(wù)的 goroutine 數(shù)量

工作流:

在每個工作流執(zhí)行到下一步前先去判斷上一步的結(jié)果

工作流內(nèi)嵌入一些攔截器

一般系統(tǒng)重要的查詢增加了緩存后,如果遇到緩存擊穿,那么可以通過任務(wù)計劃,加索等方式去解決這個問題,singleflight 這個庫也可以很不錯的應(yīng)對這種問題。

它可以獲取第一次請求得結(jié)果去返回給相同得請求 核心方法 Do 執(zhí)行和返回給定函數(shù)的值,確保某一個時間只有一個方法被執(zhí)行。
如果一個重復(fù)的請求進入,則重復(fù)的請求會等待前一個執(zhí)行完畢并獲取相同的數(shù)據(jù),返回值 shared 標識返回值 v 是否是傳遞給重復(fù)的調(diào)用請求。

一句話形容他的功能,它可以用來歸并請求,但是最好加上超時重試等機制,防止第一個 執(zhí)行 得請求出現(xiàn)超時等異常情況導(dǎo)致同時間大量請求不可用。

場景: 數(shù)據(jù)變化量小(key 變化不頻繁,重復(fù)率高),但是請求量大的場景

demo

package main

import (
 "golang.org/x/sync/singleflight"
 "log"
 "math/rand"
 "sync"
 "time"
)

var (
 g singleflight.Group
)

const (
 funcKey = "key"

 times = 5
 randomNum = 100
)

func init() {
 rand.Seed(time.Now().UnixNano())
}

func main() {
 var wg sync.WaitGroup
 wg.Add(times)

 for i := 0; i < times; i++ {
  go func() {
   defer wg.Done()
   num, err := run(funcKey)
   if err != nil {
    log.Fatal(err)
    return
   }
   log.Println(num)
  }()
 }
 wg.Wait()
}

func run(key string) (num int, err error) {
 v, err, isShare := g.Do(key, func() (interface{}, error) {
  time.Sleep(time.Second * 5)
  num = rand.Intn(randomNum) //[0,100)
  return num, nil
 })
 if err != nil {
  log.Fatal(err)
  return 0, err
 }
 data := v.(int)
 log.Println(isShare)
 return data, nil
}

連續(xù)執(zhí)行 3 次,返回結(jié)果如下,全部取了共享得結(jié)果:

但是注釋掉 time.Sleep(time.Second * 5) 再嘗試一次看看。

這次全部取得真實值

實踐: 伙伴部門高峰期可以減少 20% 的 Redis 調(diào)用, 大大減少了 Redis 的負載

注: 下面用到的方案因為開發(fā)時間較早,并不一定是以上多種方案中最優(yōu)的,選擇有很多種,使用那種方案只有有所考慮可以自圓其說即可。

建議: 項目中逐漸形成統(tǒng)一解決方案,從混亂到統(tǒng)一,逐漸小團隊內(nèi)對此類邏輯形成統(tǒng)一的一個解決標準,而不是大家對需求之外的控制代碼寫出各式各樣的控制邏輯。

  • 場景

批量校驗接口限頻單賬戶最高 100qps/s,整個系統(tǒng)多個校驗場景公用一個賬戶限頻需要限制批量校驗最高為 50~80 qps/s(需要預(yù)留令牌供其他場景使用,否則頻繁調(diào)用批量接口時候其他場景均會失敗限頻)。

  • 設(shè)計

1.使用 go routine 來并發(fā)進行三要素校驗,因為 go routinue,所以每次開啟 50 ~ 80 go routine 同時進行單次三要素校驗;

2.每輪校驗耗時 1s,如果所有 routinue 校驗后與校驗開始時間間隔不滿一秒,則需要主動程序睡眠至 1s,然后開始下輪校驗;

3.因為只是校驗場景,如果某次校驗失敗,最容易的原因其實是校驗方異常,或者被其他校驗場景再當前 1s 內(nèi)消耗過多令牌;那么整個批量接口返回 err,運營同學(xué)重新發(fā)起就好。

  • 代碼

代碼需要進行的優(yōu)化點:

    • 加鎖(推薦使用,最多不到 100 的競爭者數(shù)目,使用鎖性能影響微乎其微);
    • 給每個傳入 routine 的 element 數(shù)組包裝,增加一個 key 屬性,每個返回的 result 包含 key 通過 key 映射可以得到需要的一個順序。

1.sleep 1s 這個操作可以從調(diào)用前開始計時,調(diào)用完成后不滿 1s 補充至 1s,而不是每次最長調(diào)用時間 elapsedTime + 1s;

2.通道中獲取的三要素校驗結(jié)果順序和入?yún)?shù)據(jù)數(shù)組順序不對應(yīng),這里通過兩種方案:

3.分組調(diào)用
getElementResponseConcurrent 方法時,傳入切片可以省略部分計算,直接使用切片表達式。

elementNum := len(elements)
m := elementNum / 80
n := elementNum % 80
if m < 1 {
if results, err := getElementResponseConcurrent(ctx, elements, conn, caller); err != nil {
return nil, err
} else {
response.Results = results
return response, nil
}
} else {
results := make([]int64, 0)
if n != 0 {
m = m + 1
}
var result []int64
for i := 1; i <= m; i++ {
if i == m {
result, err = getElementResponseConcurrent(ctx, elements[(i-1)*80:(i-1)*80+n], conn, caller)
} else {
result, err = getElementResponseConcurrent(ctx, elements[(i-1)*80:i*80], conn, caller)
}
if err != nil {
return nil, err
}
results = append(results, result...)
}
response.Results = results
}

// getElementResponseConcurrent
func getElementResponseConcurrent(ctx context.Context, elements []*api.ThreeElements, conn *grpc.ClientConn,
caller *api.Caller) ([]int64, error) {
results := make([]int64, 0)

var chResult = make(chan int64)
chanErr := make(chan error)
defer close(chanErr)
wg := sync.WaitGroup{}

faceIdClient := api.NewFaceIdClient(conn)
for _, element := range elements {
wg.Add(1)
go func(element *api.ThreeElements) {
param := element.Param
verificationRequest := &api.CheckMobileVerificationRequest{
Caller: caller,
Param: param,
}
if verification, err := faceIdClient.CheckMobileVerification(ctx, verificationRequest); err != nil {
chanErr <- err
return
} else {
result := verification.Result
chanErr <- nil 
chResult <- result
}
defer wg.Done()
}(element)
}

for i := 0; i < len(elements); i++ {
if err := <-chanErr; err != nil {
return nil, err
}
var result = <-chResult
results = append(results, result)
}
wg.Wait()
time.Sleep(time.Second)
return results, nil
}

場景: 產(chǎn)品上線一年,逐步開始做數(shù)據(jù)分析和統(tǒng)計需求提供給運營使用,接入 Tdw 之前是直接采用接口讀歷史表進行的數(shù)據(jù)分析,涉及全量用戶的分析給用戶記錄打標簽,數(shù)據(jù)效率較低,所以采用并發(fā)分組方法,考慮協(xié)程比較輕量,從開始上線時間節(jié)點截止當前時間分共 100 組,代碼較為簡單。

問題: 本次接口不是上線最終版,核心分析方法僅測試環(huán)境少量數(shù)據(jù)就會有 N 多條慢查詢,所以這塊還需要去對整體資源業(yè)務(wù)背景問題去考慮,防止線上數(shù)據(jù)量較大還有慢查詢出現(xiàn) cpu 打滿。

func (s ServiceOnceJob) CompensatingHistoricalLabel(ctx context.Context,
request *api.CompensatingHistoricalLabelRequest) (response *api.CompensatingHistoricalLabelResponse, err error) {
if request.Key != interfaceKey {
return nil, transform.Simple("err")
}

ctx, cancelFunc := context.WithCancel(ctx)
var (
wg = new(sync.WaitGroup)
userRegisterDb = new(datareportdb.DataReportUserRegisteredRecords)
startNum = int64(0)
)
wg.Add(1)

countHistory, err := userRegisterDb.GetUserRegisteredCountForHistory(ctx, historyStartTime, historyEndTime)
if err != nil {
return nil, err
}

div := decimal.NewFromFloat(float64(countHistory)).Div(decimal.NewFromFloat(float64(theNumberOfConcurrent)))
f, _ := div.Float64()
num := int64(math.Ceil(f))

for i := 0; i < theNumberOfConcurrent; i++ {
go func(startNum int64) {
defer wg.Done()
for {
select {
case <- ctx.Done():
return
default:
userDataArr, err := userRegisterDb.GetUserRegisteredDataHistory(ctx, startNum, num)
if err != nil {
cancelFunc()
}
for _, userData := range userDataArr {
if err := analyseUserAction(userData); err != nil {
cancelFunc()
}
}
}
}
}(startNum)
startNum = startNum + num
}
wg.Wait()

return response, nil
}


實現(xiàn)思路和上面其實差不多,都是需要支持批量的特性,基本上現(xiàn)在業(yè)務(wù)中統(tǒng)一使用多協(xié)程處理。

  • 衡量指標,協(xié)程數(shù)目衡量

基本上可以這樣理解這件事

    • 不要一個請求 spawn 出太多請求,指數(shù)級增長。這一點,在第二點會受到加強;
    • 當你生成 goroutines,需要明確他們何時退出以及是否退出,良好管理每個 goroutines盡量保持并發(fā)代碼足夠簡單,這樣 grroutines 得生命周期就很明顯了,如果沒做到,那么要記錄下異常 goroutine 退出的時間和原因
    • 數(shù)目的話應(yīng)該需要多少搞多少,擴增服務(wù)而不是限制,限制一般或多或少都會不合理,不僅 delay 更會造成擁堵
    • 注意 協(xié)程泄露 問題,關(guān)注服務(wù)的指標。
  • 任何情況使用鎖一定要切記鎖的釋放,任何情況!任何情況!任何情況!即便是 panic 時也要記得鎖的釋放,否則可以有下面的情況
    • 代碼庫提供給他人使用,出現(xiàn) panic 時候被外部 recover,這時候就會導(dǎo)致鎖沒釋放。

一個 goroutine 啟動后沒有正常退出,而是直到整個服務(wù)結(jié)束才退出,這種情況下,goroutine 無法釋放,內(nèi)存會飆高,嚴重可能會導(dǎo)致服務(wù)不可用

goroutine 的退出其實只有以下幾種方式可以做到

    • main 函數(shù)退出
    • context 通知退出
    • goroutine panic 退出
    • goroutine 正常執(zhí)行完畢退出

大多數(shù)引起 goroutine 泄露的原因基本上都是如下情況

    • channel 阻塞,導(dǎo)致協(xié)程永遠沒有機會退出
    • 異常的程序邏輯(比如循環(huán)沒有退出條件)

杜絕:

  • 想要杜絕這種出現(xiàn)泄露的情況,需要清楚的了解 channel 再 goroutine 中的使用,循環(huán)是否有正確的跳出邏輯

排查:

  • go pprof 工具
  • runtime.NumGoroutine() 判斷實時協(xié)程數(shù)
  • 第三方庫

案例:

package main

import (
 "fmt"
 "net/http"
 _ "net/http/pprof"
 "runtime"
 "time"
)

func toLeak() {
 c := make(chan int)
 go func() {
  <-c
 }()
}

func main() {
 go toLeak()

 go func() {
  _ = http.ListenAndServe("0.0.0.0:8080", nil)
 }()

 c := time.Tick(time.Second)
 for range c {
  fmt.Printf("goroutine [nums]: %d\n", runtime.NumGoroutine())
 }
}

輸出:

pprof:

  • http://127.0.0.1:8080/debug/pprof/goroutine?debug=1

復(fù)雜情況也可以用其他的可視化工具:

  • go tool pprof -http=:8001 http://127.0.0.1:8080/debug/pprof/goroutine?debug=1

使用方便,支持鏈式調(diào)用

父協(xié)程捕獲子協(xié)程 panic

有鎖的地方就去用 channel 優(yōu)化,這句話可能有點絕對,肯定不是所有場景都可以做到,但是大多數(shù)場景絕 X 是可以的,干掉鎖去使用 channel 優(yōu)化代碼進行解耦絕對是一個有趣的事情。

分享一個很不錯的優(yōu)化 demo:

場景:

  • 一個簡單的即時聊天室,支持連接成功的用戶收發(fā)消息,使用 socket;
  • 客戶端發(fā)送消息到服務(wù)端,服務(wù)端可以發(fā)送消息到每一個客戶端。

分析:

  1. 需要一個鏈接池保存每一個客戶端;
  2. 客戶端發(fā)送消息到服務(wù)端,服務(wù)端遍歷鏈接池發(fā)送給各個客戶端
  • 用戶斷開鏈接,需要移除鏈接池的對應(yīng)鏈接,否則會發(fā)送發(fā)錯;
  • 遍歷發(fā)送消息,需要再 goroutine 中發(fā)送,不應(yīng)該被阻塞。

問題:

  • 上述有個針對鏈接池的并發(fā)操作

解決

  • 引入鎖

增加鎖機制,解決針對鏈接池的并發(fā)問題發(fā)送消息也需要去加鎖因為要防止出現(xiàn) panic: concurrent write to websocket connection

    • 導(dǎo)致的問題

假設(shè)網(wǎng)絡(luò)延時,用戶新增時候還有消息再發(fā)送中,新加入的用戶就無法獲得鎖了,后面其他的相關(guān)操作都會被阻塞導(dǎo)致問題。

使用 channel 優(yōu)化:

  • 引入 channel新增客戶端集合,包含三個通道
  • 鏈接新增通道 registerChan,鏈接移除通道 unregisterChan,發(fā)送消息通道 messageChan。

2.使用通道

  • 新增鏈接,鏈接丟入 registerChan;
  • 移除鏈接,鏈接丟入 unregisterChan;
  • 消息發(fā)送,消息丟入 messageChan;

3.通道消息方法,代碼來自于開源項目 簡單聊天架構(gòu)演變:

// 處理所有管道任務(wù)
func (room *Room) ProcessTask() {
log := zap.S()
log.Info("啟動處理任務(wù)")
for {
select {
case c := <-room.register:
log.Info("當前有客戶端進行注冊")
room.clientsPool[c] = true
case c := <-room.unregister:
log.Info("當前有客戶端離開")
if room.clientsPool[c] {
close(c.send)
delete(room.clientsPool, c)
}
case m := <-room.send:
for c := range room.clientsPool {
select {
case c.send <- m:
default:
break
}
}
}
}
}

結(jié)果:

成功使用 channel 替換了鎖。

  1. 父協(xié)程捕獲子協(xié)程 panic
  2. 啟發(fā)代碼 1: 微服務(wù)框架啟發(fā)代碼 2: 同步/異步工具包
  3. goroutine 如何實現(xiàn)
  4. 從簡單的即時聊天來看架構(gòu)演變(simple-chatroom)
golang 游戲開發(fā)(?超強指南!Golang 并發(fā)編程)

熱門課程推薦

熱門資訊

請綁定手機號

x

同學(xué)您好!

您已成功報名0元試學(xué)活動,老師會在第一時間與您取得聯(lián)系,請保持電話暢通!
確定