2026-06-01
go
0
go
sync 包的武器库: 互斥锁 Mutex / RWMutex — 保护共享数据 等待组 WaitGroup — 等一批 goroutine 干完 单次执行 Once — 只执行一次(单例、初始化) 对象池 Pool — 复用临时对象,降低 GC 压力 条件变量 Cond — 在特定条件下通知等待的 goroutine 原子操作 atomic(独立包) — 无锁的数值操作 并发 Map sync.Map(本章不讲) — 特定场景下的并发 map

Rob Pike 的经典原则: "Don't communicate by sharing memory; share memory by communicating."

翻译:别用共享内存来通信,用通信来共享内存。

但这不代表不能用锁。简单状态用锁,复杂协调用 channel。

go
// sync 包练习题 // 运行:go run . 或 go test -v // 用 -race 检测数据竞争:go test -race -v package main import ( "fmt" "sync" "time" ) // ============================================================ // 练习 1:修复数据竞争 // ============================================================ // 下面的代码有 data race,1000 个 goroutine 各加 100 次 // 结果大概率不等于 100000 // 任务:用 sync.Mutex 修复 // TODO: 添加 Mutex var counter int var mu sync.Mutex func increment() { mu.Lock() defer mu.Unlock() counter++ } func Exercise1() { fmt.Println("=== 练习 1:Mutex 修复数据竞争 ===") counter = 0 var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < 100; j++ { increment() } }() } wg.Wait() fmt.Printf("counter = %d (期望 100000)\n", counter) } // ============================================================ // 练习 2:RWMutex 实现线程安全缓存 // ============================================================ // 实现以下方法,让 Cache 支持并发读写 // 提示:读操作用 RLock,写操作用 Lock type Cache struct { // TODO: 添加 RWMutex rw sync.RWMutex data map[string]string } func NewCache() *Cache { return &Cache{ data: make(map[string]string), } } func (c *Cache) Get(key string) (string, bool) { // TODO: 加读锁 c.rw.RLock() defer c.rw.RUnlock() val, ok := c.data[key] return val, ok // TODO: 释放读锁 } func (c *Cache) Set(key, val string) { // TODO: 加写锁 c.rw.Lock() defer c.rw.Unlock() c.data[key] = val // TODO: 释放写锁 } func (c *Cache) Delete(key string) { // TODO: 加写锁 c.rw.Lock() defer c.rw.Unlock() delete(c.data, key) // TODO: 释放写锁 } func Exercise2() { fmt.Println("=== 练习 2:RWMutex 实现并发缓存 ===") c := NewCache() var wg sync.WaitGroup // 10 个写 goroutine for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() key := fmt.Sprintf("key-%d", id) c.Set(key, fmt.Sprintf("value-%d", id)) }(i) } // 100 个读 goroutine for i := 0; i < 100; i++ { wg.Add(1) go func() { defer wg.Done() c.Get("key-0") // 这里希望读到 value-0 }() } wg.Wait() if v, ok := c.Get("key-0"); ok { fmt.Printf("key-0 = %s (期望 value-0)\n", v) } } // ============================================================ // 练习 3:WaitGroup 陷阱修复 // ============================================================ // 下面的代码有问题,请找出并修复 // 提示:至少有两个问题 func brokenWorker(wg *sync.WaitGroup, id int) { defer wg.Done() fmt.Printf("worker %d 完成\n", id) } func Exercise3() { fmt.Println("=== 练习 3:WaitGroup 陷阱 ===") var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go brokenWorker(&wg, i) // BUG:这里有什么问题? } wg.Wait() fmt.Println("all done") } // ============================================================ // 练习 4:用 sync.Once 实现线程安全的单例 // ============================================================ // 实现 GetDB() 函数,确保 DB 只初始化一次 type DB struct { conn string } var ( instance *DB // TODO: 添加 sync.Once once sync.Once ) func GetDB() *DB { // TODO: 用 Once 确保 conn 只初始化一次 once.Do(func() { if instance == nil { instance = &DB{conn: "connected"} fmt.Println("初始化数据库连接...") } }) return instance } func Exercise4() { fmt.Println("=== 练习 4:Once 实现单例 ===") var wg sync.WaitGroup results := make(chan *DB, 100) for i := 0; i < 100; i++ { wg.Add(1) go func() { defer wg.Done() results <- GetDB() }() } wg.Wait() close(results) // 验证所有 goroutine 拿到的都是同一个实例 first := <-results allSame := true for db := range results { if db != first { allSame = false break } } fmt.Printf("所有 goroutine 返回同一实例: %v (期望 true)\n", allSame) } // ============================================================ // 练习 5:用 sync.Pool 优化缓冲区分配 // ============================================================ // 下面的代码每次都分配新的 []byte,在高并发下 GC 压力大 // 任务:用 sync.Pool 优化 func processDataWithoutPool(p *sync.Pool, data string) string { // 每次调用都分配新缓冲区 buf := p.Get().([]byte) defer p.Put(buf[:0]) buf = append(buf, data...) buf = append(buf, " (processed)"...) return string(buf) } func Exercise5() { fmt.Println("=== 练习 5:sync.Pool 优化缓冲区 ===") // TODO: 创建 sync.Pool,New 函数返回 make([]byte, 0, 1024) pool := &sync.Pool{ New: func() any { return make([]byte, 0, 1024) }, } // TODO: 实现 processDataWithPool,用 Pool 获取/归还缓冲区 result := processDataWithoutPool(pool, "test") fmt.Println(result) fmt.Println("processDataWithoutPool: ok (待优化)") } // ============================================================ // 练习 6(挑战题):实现并发安全的计数器 // ============================================================ // 实现一个 SafeCounter,支持以下操作: // Inc() — 加 1 // Dec() — 减 1 // Value() — 返回当前值 // Reset() — 重置为 0 // WaitFor(n int) — 阻塞直到 Value() >= n // // 提示:WaitFor 可以用 Cond 实现 type SafeCounter struct { // TODO: 添加必要字段 mu sync.Mutex cond *sync.Cond val int } func NewSafeCounter() *SafeCounter { // TODO sc := &SafeCounter{} sc.cond = sync.NewCond(&sc.mu) return sc } func (sc *SafeCounter) Inc() { // TODO sc.mu.Lock() sc.val++ sc.cond.Broadcast() sc.mu.Unlock() } func (sc *SafeCounter) Dec() { // TODO sc.mu.Lock() sc.val-- sc.cond.Broadcast() sc.mu.Unlock() } func (sc *SafeCounter) Value() int { // TODO sc.mu.Lock() v := sc.val sc.mu.Unlock() return v } func (sc *SafeCounter) Reset() { // TODO sc.mu.Lock() sc.val = 0 sc.cond.Broadcast() sc.mu.Unlock() } func (sc *SafeCounter) WaitFor(target int) { // TODO: 用 Cond 实现阻塞等待 sc.mu.Lock() for sc.val < target { sc.cond.Wait() } sc.mu.Unlock() } func Exercise6() { fmt.Println("=== 练习 6(挑战):并发安全计数器 ===") sc := NewSafeCounter() var wg sync.WaitGroup // 启动 50 个 goroutine 各 Inc 100 次 for i := 0; i < 50; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < 100; j++ { sc.Inc() } }() } // 再启动一个 goroutine 等待值达到 3000 wg.Add(1) go func() { defer wg.Done() sc.WaitFor(3000) fmt.Printf("计数器已达到 3000!当前值: %d\n", sc.Value()) }() wg.Wait() fmt.Printf("最终值: %d (期望 5000)\n", sc.Value()) } // ============================================================ // 练习 7(场景题):设计一个并发限流器 // ============================================================ // 实现一个 RateLimiter: // - 最多允许 N 个并发操作同时执行 // - Allow() bool — 尝试获取一个槽位,成功返回 true // - Release() — 释放一个槽位 type RateLimiter struct { // TODO sem chan any } func NewRateLimiter(maxConcurrency int) *RateLimiter { // TODO return &RateLimiter{ sem: make(chan any, maxConcurrency), } } func (rl *RateLimiter) Allow() bool { // TODO select { case rl.sem <- struct{}{}: return true default: return false } } func (rl *RateLimiter) Release() { // TODO <-rl.sem } func Exercise7() { fmt.Println("=== 练习 7(场景题):并发限流器 ===") limiter := NewRateLimiter(3) // 最多 3 个并发 var wg sync.WaitGroup active := make(chan struct{}, 10) for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for !limiter.Allow() { time.Sleep(10 * time.Millisecond) } defer limiter.Release() active <- struct{}{} current := len(active) if current > 3 { fmt.Printf("❌ 超过并发限制!当前活跃: %d\n", current) } time.Sleep(100 * time.Millisecond) // 模拟工作 <-active }(i) } wg.Wait() fmt.Println("rate limiter test done") } func main() { Exercise1() fmt.Println() Exercise2() fmt.Println() Exercise3() fmt.Println() Exercise4() fmt.Println() Exercise5() fmt.Println() Exercise6() fmt.Println() Exercise7() }