GO channel总结

作者: 分类: php 时间: 2026-05-29 评论: 暂无评论

1. Channel 四种读取方式

1.1 直接读取(无 ok 检测)

ch := make(chan int, 1)
v := <-ch // 不会 panic,但无法区分是真实零值还是关闭后零值

1.2 带 ok 检测的读取

v, ok := <-ch
// ok 为 true 时,表示成功从 channel 读到真实数据。
// ok 为 false 时,表示 channel 已关闭且无更多数据,此时 v 是类型零值。

1.3 range 循环读取

for v := range ch {
    // ...
}
  • 会持续从 channel 读取值,直到 channel 被关闭且数据读完,循环自动终止。
  • 等效于不断执行 v, ok := <-ch 并检查 ok,写法更简洁。
  • 注意:如果 channel 永远不关闭,会一直阻塞造成死锁(除非有其他退出机制)。

1.4 select 多路复用读取

select {
case v := <-ch:
    // 处理数据
default:
    // 非阻塞
}
  • 可以同时监听多个 channel,哪个有数据就执行对应的 case。
  • 配合 default 可实现非阻塞读取:若所有 channel 都无数据则立即执行 default。
  • 即使某个 channel 已关闭,读取也会立即得到零值(需结合 v, ok := <-ch 检测关闭)。

Channel 状态速记

操作nil channel正常 channelclosed channel
读取阻塞正常读取返回零值
写入阻塞正常写入panic
关闭panic正常关闭panic

读关零值,写关 panic;nil 两边都阻塞;无缓冲要配对,有缓冲看容量;关两次也要 panic。


2. sync.WaitGroup 等待多 goroutine

场景:汇总多个请求并发处理。如同时对多个用户发送通知、对多个文件压缩、大量图片缩略图生成、多 API 聚合。

2.1 基础用法

package main

import (
    "fmt"
    "sync"
    "time"
)

func work(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second * 2)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go work(i, &wg)
    }
    wg.Wait()
    fmt.Println("ALL DONE")
}

2.2 多服务聚合(带错误处理)

package main

func AggregateData(ctx context.Context) (Result, error) {
    var wg sync.WaitGroup
    var mu sync.Mutex
    var userData *User
    var orderData *Order
    var errs []error

    wg.Add(3)

    // 调用用户服务
    go func() {
        defer wg.Done()
        data, err := callUserService(ctx)
        mu.Lock()
        userData = data
        if err != nil {
            errs = append(errs, err)
        }
        mu.Unlock()
    }()

    // 调用订单服务
    go func() {
        defer wg.Done()
        data, err := callOrderService(ctx)
        mu.Lock()
        orderData = data
        if err != nil {
            errs = append(errs, err)
        }
        mu.Unlock()
    }()

    // 调用库存服务(类似)

    wg.Wait()
    // 汇总结果或处理错误
    if len(errs) > 0 {
        return Result{}, combineErrors(errs)
    }
    return merge(userData, orderData), nil
}

3. 生产者-消费者模式(Channel 实现)

package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "sync"
    "time"
)

// Order 任务结构体(模拟外卖订单)
type Order struct {
    ID       int
    Shop     string  // 店铺名
    Dish     string  // 菜品
    Distance float64 // 配送距离(km)
}

func GenOrderNo() string {
    now := time.Now().Unix()
    randNum := rand.Intn(10000)
    orderNo := fmt.Sprintf("%d%d", now, randNum)
    return orderNo
}

func main() {
    orderChan := make(chan Order, 5)

    var producerWg sync.WaitGroup
    var consumerWg sync.WaitGroup

    // 启动生产者(15个)
    for i := 1; i <= 15; i++ {
        producerWg.Add(1)
        go func(id int) {
            defer producerWg.Done()
            orderId, _ := strconv.ParseInt(GenOrderNo(), 10, 64)
            order := Order{
                ID:       int(orderId),
                Shop:     "厦门小吃店",
                Dish:     "海蛎煎",
                Distance: float64(9%3 + 1),
            }
            fmt.Printf("【生产者-%d】新订单:%d\n", id, order.ID)
            orderChan <- order
            time.Sleep(time.Millisecond * 10)
        }(i)
    }

    // 启动消费者(3个骑手)
    for i := 1; i <= 3; i++ {
        consumerWg.Add(1)
        go func(id int) {
            defer consumerWg.Done()
            for order := range orderChan {
                fmt.Printf("【骑手-%d】接到订单 %d,配送 %.1f km\n", id, order.ID, order.Distance)
                time.Sleep(time.Second * 5)
                fmt.Printf("【骑手-%d】订单 %d 已送达\n", id, order.ID)
            }
        }(i)
    }

    // 等待所有生产者完成 → 关闭 channel → 等待消费者完成
    producerWg.Wait()
    close(orderChan)
    consumerWg.Wait()

    fmt.Println("厦门外卖系统打烊啦~")
}

4. select 超时控制

package main

import (
    "context"
    "fmt"
    "time"
)

func callUserServe(ctx context.Context) (string, error) {
    timer := time.NewTimer(time.Second * 2)
    select {
    case <-timer.C:
        return "user_info", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func callOrderServe(ctx context.Context) (string, error) {
    timer := time.NewTimer(time.Second * 3)
    select {
    case <-timer.C:
        return "order_info", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    type result struct {
        name string
        data string
        err  error
    }
    resultCh := make(chan result, 2)

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
    defer cancel()

    go func() {
        data, err := callUserServe(ctx)
        resultCh <- result{"user", data, err}
    }()

    go func() {
        data, err := callOrderServe(ctx)
        resultCh <- result{"order", data, err}
    }()

    ret := make(map[string]string)
    for i := 0; i < 2; i++ {
        select {
        case <-ctx.Done(): // 整体超时或上游取消
            fmt.Println("整体超时或上游取消!")
            return
        case res := <-resultCh:
            if res.err != nil {
                fmt.Printf("%s error: %v\n", "事务", res.err)
                return
            }
            ret[res.name] = res.data
        }
    }
    fmt.Printf("最后结果: %v\n", ret)
}

5. Worker Pool(带最大并发数控制)

package main

import (
    "fmt"
    "sync"
    "time"
)

type Task func()

type WorkPool struct {
    maxWorkers int
    taskQueue  chan Task
    wg         sync.WaitGroup
    stopOnce   sync.Once
    stop       chan struct{}
}

func NewWorkPool(maxWorker int, queueSize int) *WorkPool {
    return &WorkPool{
        maxWorkers: maxWorker,
        taskQueue:  make(chan Task, queueSize),
        stop:       make(chan struct{}),
    }
}

func (p *WorkPool) worker() {
    defer p.wg.Done()
    for {
        select {
        case task, ok := <-p.taskQueue:
            if !ok {
                return // 任务队列已关闭,worker 退出
            }
            task()
        case <-p.stop:
            return // 收到停止信号,退出
        }
    }
}

func (p *WorkPool) Start() {
    for i := 0; i < p.maxWorkers; i++ {
        p.wg.Add(1)
        go p.worker()
    }
}

func (p *WorkPool) Submit(task Task) bool {
    select {
    case <-p.stop:
        return false
    default:
    }
    select {
    case p.taskQueue <- task:
        return true
    case <-p.stop:
        return false
    }
}

// StopGracefully 优雅关闭:不再接收新任务,等待所有已提交任务完成
func (p *WorkPool) StopGracefully() {
    p.stopOnce.Do(func() {
        close(p.taskQueue)
    })
    p.wg.Wait()
}

func main() {
    pool := NewWorkPool(2, 6) // 2个协程,队列长度6
    pool.Start()

    for i := 0; i < 10; i++ {
        taskID := i
        submit := pool.Submit(func() {
            fmt.Printf("task %d 开始\n", taskID)
            time.Sleep(500 * time.Millisecond)
            fmt.Printf("任务 %d 执行完毕\n", taskID)
        })
        if !submit {
            fmt.Printf("任务 %d 提交失败,池已停止\n", taskID)
        }
    }

    pool.StopGracefully()
    fmt.Println("pool stopped")
}

6. sync.Mutex 保护计数器

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.RWMutex
    value int
}

func (c *Counter) Add() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.value
}

func main() {
    var wg sync.WaitGroup
    c := &Counter{}

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            c.Add()
        }()
    }

    wg.Wait()
    fmt.Println(c.Value())
}

7. sync.RWMutex 实现读多写少的缓存

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.RWMutex
    value int
}

func (c *Counter) Add() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.value
}

func main() {
    var wg sync.WaitGroup
    c := &Counter{}

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            c.Add()
        }()
    }

    wg.Wait()
    fmt.Println(c.Value())
}

标签: none

订阅本站(RSS)