gosync 包的武器库:
互斥锁 Mutex / RWMutex — 保护共享数据
等待组 WaitGroup — 等一批 goroutine 干完
单次执行 Once — 只执行一次(单例、初始化)
对象池 Pool — 复用临时对象,降低 GC 压力
条件变量 Cond — 在特定条件下通知等待的 goroutine
原子操作 atomic(独立包) — 无锁的数值操作
并发 Map sync.Map(本章不讲) — 特定场景下的并发 map
Rob Pike 的经典原则: "Don't communicate by sharing memory; share memory by communicating."
翻译:别用共享内存来通信,用通信来共享内存。
但这不代表不能用锁。简单状态用锁,复杂协调用 channel。
go// sync 包练习题
// 运行:go run . 或 go test -v
// 用 -race 检测数据竞争:go test -race -v
package main
import (
"fmt"
"sync"
"time"
)
// ============================================================
// 练习 1:修复数据竞争
// ============================================================
// 下面的代码有 data race,1000 个 goroutine 各加 100 次
// 结果大概率不等于 100000
// 任务:用 sync.Mutex 修复
// TODO: 添加 Mutex
var counter int
var mu sync.Mutex
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}
func Exercise1() {
fmt.Println("=== 练习 1:Mutex 修复数据竞争 ===")
counter = 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
increment()
}
}()
}
wg.Wait()
fmt.Printf("counter = %d (期望 100000)\n", counter)
}
// ============================================================
// 练习 2:RWMutex 实现线程安全缓存
// ============================================================
// 实现以下方法,让 Cache 支持并发读写
// 提示:读操作用 RLock,写操作用 Lock
type Cache struct {
// TODO: 添加 RWMutex
rw sync.RWMutex
data map[string]string
}
func NewCache() *Cache {
return &Cache{
data: make(map[string]string),
}
}
func (c *Cache) Get(key string) (string, bool) {
// TODO: 加读锁
c.rw.RLock()
defer c.rw.RUnlock()
val, ok := c.data[key]
return val, ok
// TODO: 释放读锁
}
func (c *Cache) Set(key, val string) {
// TODO: 加写锁
c.rw.Lock()
defer c.rw.Unlock()
c.data[key] = val
// TODO: 释放写锁
}
func (c *Cache) Delete(key string) {
// TODO: 加写锁
c.rw.Lock()
defer c.rw.Unlock()
delete(c.data, key)
// TODO: 释放写锁
}
func Exercise2() {
fmt.Println("=== 练习 2:RWMutex 实现并发缓存 ===")
c := NewCache()
var wg sync.WaitGroup
// 10 个写 goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", id)
c.Set(key, fmt.Sprintf("value-%d", id))
}(i)
}
// 100 个读 goroutine
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.Get("key-0") // 这里希望读到 value-0
}()
}
wg.Wait()
if v, ok := c.Get("key-0"); ok {
fmt.Printf("key-0 = %s (期望 value-0)\n", v)
}
}
// ============================================================
// 练习 3:WaitGroup 陷阱修复
// ============================================================
// 下面的代码有问题,请找出并修复
// 提示:至少有两个问题
func brokenWorker(wg *sync.WaitGroup, id int) {
defer wg.Done()
fmt.Printf("worker %d 完成\n", id)
}
func Exercise3() {
fmt.Println("=== 练习 3:WaitGroup 陷阱 ===")
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go brokenWorker(&wg, i) // BUG:这里有什么问题?
}
wg.Wait()
fmt.Println("all done")
}
// ============================================================
// 练习 4:用 sync.Once 实现线程安全的单例
// ============================================================
// 实现 GetDB() 函数,确保 DB 只初始化一次
type DB struct {
conn string
}
var (
instance *DB
// TODO: 添加 sync.Once
once sync.Once
)
func GetDB() *DB {
// TODO: 用 Once 确保 conn 只初始化一次
once.Do(func() {
if instance == nil {
instance = &DB{conn: "connected"}
fmt.Println("初始化数据库连接...")
}
})
return instance
}
func Exercise4() {
fmt.Println("=== 练习 4:Once 实现单例 ===")
var wg sync.WaitGroup
results := make(chan *DB, 100)
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
results <- GetDB()
}()
}
wg.Wait()
close(results)
// 验证所有 goroutine 拿到的都是同一个实例
first := <-results
allSame := true
for db := range results {
if db != first {
allSame = false
break
}
}
fmt.Printf("所有 goroutine 返回同一实例: %v (期望 true)\n", allSame)
}
// ============================================================
// 练习 5:用 sync.Pool 优化缓冲区分配
// ============================================================
// 下面的代码每次都分配新的 []byte,在高并发下 GC 压力大
// 任务:用 sync.Pool 优化
func processDataWithoutPool(p *sync.Pool, data string) string {
// 每次调用都分配新缓冲区
buf := p.Get().([]byte)
defer p.Put(buf[:0])
buf = append(buf, data...)
buf = append(buf, " (processed)"...)
return string(buf)
}
func Exercise5() {
fmt.Println("=== 练习 5:sync.Pool 优化缓冲区 ===")
// TODO: 创建 sync.Pool,New 函数返回 make([]byte, 0, 1024)
pool := &sync.Pool{
New: func() any {
return make([]byte, 0, 1024)
},
}
// TODO: 实现 processDataWithPool,用 Pool 获取/归还缓冲区
result := processDataWithoutPool(pool, "test")
fmt.Println(result)
fmt.Println("processDataWithoutPool: ok (待优化)")
}
// ============================================================
// 练习 6(挑战题):实现并发安全的计数器
// ============================================================
// 实现一个 SafeCounter,支持以下操作:
// Inc() — 加 1
// Dec() — 减 1
// Value() — 返回当前值
// Reset() — 重置为 0
// WaitFor(n int) — 阻塞直到 Value() >= n
//
// 提示:WaitFor 可以用 Cond 实现
type SafeCounter struct {
// TODO: 添加必要字段
mu sync.Mutex
cond *sync.Cond
val int
}
func NewSafeCounter() *SafeCounter {
// TODO
sc := &SafeCounter{}
sc.cond = sync.NewCond(&sc.mu)
return sc
}
func (sc *SafeCounter) Inc() {
// TODO
sc.mu.Lock()
sc.val++
sc.cond.Broadcast()
sc.mu.Unlock()
}
func (sc *SafeCounter) Dec() {
// TODO
sc.mu.Lock()
sc.val--
sc.cond.Broadcast()
sc.mu.Unlock()
}
func (sc *SafeCounter) Value() int {
// TODO
sc.mu.Lock()
v := sc.val
sc.mu.Unlock()
return v
}
func (sc *SafeCounter) Reset() {
// TODO
sc.mu.Lock()
sc.val = 0
sc.cond.Broadcast()
sc.mu.Unlock()
}
func (sc *SafeCounter) WaitFor(target int) {
// TODO: 用 Cond 实现阻塞等待
sc.mu.Lock()
for sc.val < target {
sc.cond.Wait()
}
sc.mu.Unlock()
}
func Exercise6() {
fmt.Println("=== 练习 6(挑战):并发安全计数器 ===")
sc := NewSafeCounter()
var wg sync.WaitGroup
// 启动 50 个 goroutine 各 Inc 100 次
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
sc.Inc()
}
}()
}
// 再启动一个 goroutine 等待值达到 3000
wg.Add(1)
go func() {
defer wg.Done()
sc.WaitFor(3000)
fmt.Printf("计数器已达到 3000!当前值: %d\n", sc.Value())
}()
wg.Wait()
fmt.Printf("最终值: %d (期望 5000)\n", sc.Value())
}
// ============================================================
// 练习 7(场景题):设计一个并发限流器
// ============================================================
// 实现一个 RateLimiter:
// - 最多允许 N 个并发操作同时执行
// - Allow() bool — 尝试获取一个槽位,成功返回 true
// - Release() — 释放一个槽位
type RateLimiter struct {
// TODO
sem chan any
}
func NewRateLimiter(maxConcurrency int) *RateLimiter {
// TODO
return &RateLimiter{
sem: make(chan any, maxConcurrency),
}
}
func (rl *RateLimiter) Allow() bool {
// TODO
select {
case rl.sem <- struct{}{}:
return true
default:
return false
}
}
func (rl *RateLimiter) Release() {
// TODO
<-rl.sem
}
func Exercise7() {
fmt.Println("=== 练习 7(场景题):并发限流器 ===")
limiter := NewRateLimiter(3) // 最多 3 个并发
var wg sync.WaitGroup
active := make(chan struct{}, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for !limiter.Allow() {
time.Sleep(10 * time.Millisecond)
}
defer limiter.Release()
active <- struct{}{}
current := len(active)
if current > 3 {
fmt.Printf("❌ 超过并发限制!当前活跃: %d\n", current)
}
time.Sleep(100 * time.Millisecond) // 模拟工作
<-active
}(i)
}
wg.Wait()
fmt.Println("rate limiter test done")
}
func main() {
Exercise1()
fmt.Println()
Exercise2()
fmt.Println()
Exercise3()
fmt.Println()
Exercise4()
fmt.Println()
Exercise5()
fmt.Println()
Exercise6()
fmt.Println()
Exercise7()
}