Go语言默认的 sync.RWMutex 实现在多核环境中表现并不佳,因为所有的读者在进行原子增量操作时,会抢占相同的内存地址。该文探讨了一种 n-way RWMutex,也可以称为“大读者(big reader)”锁,它可以为每个 CPU 内核分配独立的 RWMutex。读者仅需在其核心中处理读锁,而写者则须依次处理所有锁。

查找当前 CPU

读者使用 CPUID 指令来决定使用何种锁,该指令仅需返回当前活动 CPU 的 APICID,而不需要发出系统调用指令抑或改变运行时。这在 Intel 或 AMD 处理器上均是可以的;ARM 处理器则需要使用 CPU ID 寄存器 。 对于超过 256 个处理器的系统,必须使用 x2APIC, 另外除了 CPUID 还要用到带有EAX=0xb 的 EDX 寄存器。程序启动时,会构建(通过 CPU 亲和力系统调用) APICID 到 CPU 索引的映射, 该映射在处理器的整个生命周期中静态存在。由于 CPUID 指令的开销可能相当昂贵,goroutine 将只在其运行的内核中定期地更新状态结果。频繁更新可以减少内核锁阻塞,但同时也会导致花在加锁过程中的 CPUID 指令时间增加。

陈旧的 CPU 信息。如果加上锁运行 goroutine 的 CPU 信息可能会是过时的 (goroutine 会转移到另一个核心)。在 reader 记住哪个是上锁的前提下,这只会影响性能,而不会影响准确性,当然,这样的转移也是不太可能的,就像操作系统内核尝试在同一个核心保持线程来改进缓存命中率一样。


这个模式的性能特征会被大量的参数所影响。特别是 CPUID 检测频率,readers 的数量,readers 和 writers 的比率,还有 readers 持有锁的时间,这些因素都非常重要。当在这个时间有且仅有一个 writer 活跃的时候,这个 writer 持有锁的时期不会影响 sync.RWMutex 和 DRWMutex 之间的性能差异。



drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100

错误条表示第25和第75个百分位。注意每第10核的下降;这是因为10个核组成一个运行标准检查系统的机器上的NUMA节点, 所以一旦增加一个NUMA节点,跨线程通信量变得更加宝贵。对于DRWMutex来说,由于对比sync.RWMutex更多的reader能够并行工作,所以性能也随之提升。

#include "textflag.h"    // func cpu() uint64  TEXT 路cpu(SB),NOSPLIT,$0-8   MOVL $0x01, AX // version information   MOVL $0x00, BX // any leaf will do   MOVL $0x00, CX // any subleaf will do     // call CPUID   BYTE $0x0f   BYTE $0xa2     SHRQ $24, BX // logical cpu id is put in EBX[31-24]   MOVQ BX, ret+0(FP)   RET


package main    import (   "flag"   "fmt"   "math/rand"   "os"   "runtime"   "runtime/pprof"   "sync"   "syscall"   "time"   "unsafe"  )    func cpu() uint64 // implemented in cpu_amd64.s    var cpus map[uint64]int    // determine mapping from APIC ID to CPU index by pinning the entire process to  // one core at the time, and seeing that its APIC ID is.  func init() {   cpus = make(map[uint64]int)     var aff uint64   syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))     n := 0   start := time.Now()   var mask uint64 = 1  Outer:   for {    for (aff & mask) == 0 {     mask <<= 1     if mask == 0 || mask > aff {      break Outer     }    }      ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask)))    if ret != 0 {     panic(err.Error())    }      // what CPU do we have?    <-time.After(1 * time.Millisecond)    c := cpu()      if oldn, ok := cpus[c]; ok {     fmt.Println("cpu", n, "==", oldn, "-- both have CPUID", c)    }      cpus[c] = n    mask <<= 1    n++   }     fmt.Printf("%d/%d cpus found in %v: %v\n", len(cpus), runtime.NumCPU(), time.Now().Sub(start), cpus)     ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))   if ret != 0 {    panic(err.Error())   }  }    type RWMutex2 []sync.RWMutex    func (mx RWMutex2) Lock() {   for core := range mx {    mx[core].Lock()   }  }    func (mx RWMutex2) Unlock() {   for core := range mx {    mx[core].Unlock()   }  }    func main() {   cpuprofile := flag.Bool("cpuprofile", false, "enable CPU profiling")   locks := flag.Uint64("i", 10000, "Number of iterations to perform")   write := flag.Float64("p", 0.0001, "Probability of write locks")   wwork := flag.Int("w", 1, "Amount of work for each writer")   rwork := flag.Int("r", 100, "Amount of work for each reader")   readers := flag.Int("n", runtime.GOMAXPROCS(0), "Total number of readers")   checkcpu := flag.Uint64("c", 100, "Update CPU estimate every n iterations")   flag.Parse()     var o *os.File   if *cpuprofile {    o, _ := os.Create("rw.out")    pprof.StartCPUProfile(o)   }     readers_per_core := *readers / runtime.GOMAXPROCS(0)     var wg sync.WaitGroup     var mx1 sync.RWMutex     start1 := time.Now()   for n := 0; n < runtime.GOMAXPROCS(0); n++ {    for r := 0; r < readers_per_core; r++ {     wg.Add(1)     go func() {      defer wg.Done()      r := rand.New(rand.NewSource(rand.Int63()))      for n := uint64(0); n < *locks; n++ {       if r.Float64() < *write {        mx1.Lock()        x := 0        for i := 0; i < *wwork; i++ {         x++        }        _ = x        mx1.Unlock()       } else {        mx1.RLock()        x := 0        for i := 0; i < *rwork; i++ {         x++        }        _ = x        mx1.RUnlock()       }      }     }()    }   }   wg.Wait()   end1 := time.Now()     t1 := end1.Sub(start1)   fmt.Println("mx1", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1)     if *cpuprofile {    pprof.StopCPUProfile()    o.Close()      o, _ = os.Create("rw2.out")    pprof.StartCPUProfile(o)   }     mx2 := make(RWMutex2, len(cpus))     start2 := time.Now()   for n := 0; n < runtime.GOMAXPROCS(0); n++ {    for r := 0; r < readers_per_core; r++ {     wg.Add(1)     go func() {      defer wg.Done()      c := cpus[cpu()]      r := rand.New(rand.NewSource(rand.Int63()))      for n := uint64(0); n < *locks; n++ {       if *checkcpu != 0 && n%*checkcpu == 0 {        c = cpus[cpu()]       }         if r.Float64() < *write {        mx2.Lock()        x := 0        for i := 0; i < *wwork; i++ {         x++        }        _ = x        mx2.Unlock()       } else {        mx2[c].RLock()        x := 0        for i := 0; i < *rwork; i++ {         x++        }        _ = x        mx2[c].RUnlock()       }      }     }()    }   }   wg.Wait()   end2 := time.Now()     pprof.StopCPUProfile()   o.Close()     t2 := end2.Sub(start2)   fmt.Println("mx2", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2)  }