入门goroutine并发设计模式以及goroutine可视化工具
zmoh7166
8年前
<h2><strong>Daisy-Chain</strong></h2> <p>首先,为了防止过于枯燥,我先列出我最喜欢的一个模式:Daisy-Chain。这个模式比较复杂,对go的并发编程不太熟悉的同学,可以先看下面的模式。然后回过头来看这个。</p> <p>daisy chain会创建很多channel,然后把这些channel首尾相接级联起来,组成一条单向链,每个channel都在处理不同的子任务,最后的结果在链的末端输出。</p> <pre> <code class="language-go">func f(left, right chan int) { // 这个函数就把right的输出和left的输入联系起来了。 left <- 1 + <-right } func main() { const n = 10000 leftmost := make(chan int) right := leftmost left := leftmost // 创建长度为n的daisy链 for i := 0; i < n; i++ { right = make(chan int) go f(left, right) left = right } // 在链的最右端输入1,那么最左端就会得到10001 go func(c chan int) { c <- 1 }(right) fmt.Println(<-leftmost) }</code></pre> <p>整个过程类似下图:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/7a573b8a5a6dd7182e67648e820297af.png"></p> <p>那么这个模式有什么用呢?它可以用来处理迭代算法,使得部分迭代运算并发执行。只要迭代的每个阶段都是相互独立的即可。比如,计算质数:</p> <pre> <code class="language-go">import ( "fmt" "os" "runtime/trace" "time" ) func Generate(ch chan<- int) { for i := 2; ; i++ { ch <- i // 这是为了方便gotrace绘图 time.Sleep(10 * time.Millisecond) } } func Filter(ch <-chan int, out chan<- int, prime int) { for { i := <-ch if i%prime != 0 { out <- i } else { fmt.Printf("[%d] filter out %d\n", prime, i) } } } func main() { // 这些也是gotrace要求插入的代码。下同 trace.Start(os.Stderr) ch := make(chan int) go Generate(ch) for i := 0; i < 10; i++ { prime := <-ch // step1 fmt.Println(prime) out := make(chan int) // step2 go Filter(ch, out, prime) // step3 ch = out // step4 } trace.Stop() }</code></pre> <p>仔细分析上面的代码,它的功能就是输出前10个正整数质数。至于细节就让我们一步步分析看看:</p> <p>首先,Generate从2开始遍历正整数,并且在一开始就被放入goroutine里了。结果会放在 ch 里;</p> <p>然后,在main中启动一个for循环,在循环的每个step1,都会从 ch 中读出一个质数。2当然是质数,但是后面每一步从ch中读取的都是质数吗?且看下面的代码。</p> <p>然后,step2会创建一个新channel out (类似上例的right),ch和它作为输入和输出创建一个Filter的goroutine,专门过滤能被step1的prime整除的数。所以在 out 中输出的都是不会被prime整除的数。</p> <p>最后在关键的step4, out 变成下一个 ch 。相当于增加了一节chain的长度。而ch在每个循环中输出的第一个数,都是被 <strong>之前</strong> 的所有 <strong>质数</strong> 无法整除的数,即下一个质数。</p> <p>输出日志如下:</p> <pre> <code class="language-go">2 3 [2] filter out 4 5 [2] filter out 6 7 [2] filter out 8 [3] filter out 9 [2] filter out 10 11 [2] filter out 12 13 [2] filter out 14 [3] filter out 15 [2] filter out 16 17 [2] filter out 18 19 [2] filter out 20 [3] filter out 21 [2] filter out 22 23 [2] filter out 24 [5] filter out 25 [2] filter out 26 [3] filter out 27 [2] filter out 28 29</code></pre> <p>为了更直观的展示整个过程,我用divan大神的 <a href="/misc/goto?guid=4959719851987661721" rel="nofollow,noindex">gotrace</a> 工具画出了goroutine的3d交互图:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/34acc886c3a43c79e32442cac0267edc.png"></p> <p>其中每个红色竖线表示一个goroutine,时间轴是从上到下的,所以红线越长表示goroutine持续时间越长,也说明它生成的越早。</p> <p>可以看到,最早的一个goroutine获得的数字是3,4,…… 29,因为2已经被输出了,所以是3到29,然后下一个goroutine获得的就是5,7,9,…… 29,因为3被输出,而偶数都被过滤了。以此类推,最后输出的就是前10个质数。</p> <p>需要指出的是,这个算法并不是最高效的,但却是非常优雅的。</p> <p>关于gotrace的安装和使用,请移步 <a href="/misc/goto?guid=4959719851987661721" rel="nofollow,noindex">这里</a> 。我是根据他的方法给go1.6.3打了补丁后,就能使用了。</p> <p>好了,下面我们 <strong>换些基础的模式</strong> 讲一下:</p> <h2><strong>Ping-pong</strong></h2> <p>顾名思义,就是由2个goroutine相互踢皮球组成的模式。尽管它非常简单,但是却方便我们理解go的并发编程概念。</p> <p>代码如下:</p> <p>用一个int表示ball(球),管道表示table(桌子),两个goroutine就是2个运动员, 分别编号为1和2。</p> <pre> <code class="language-go">func main() { var Ball int table := make(chan int) go player("2", table) go player("1", table) // 首先把球放到“桌上” table <- Ball time.Sleep(1 * time.Second) // 1s后比赛结束…… <-table } func player(id string, table chan int) { for { ball := <-table log.Printf("%s got ball[%d]\n", id, ball) time.Sleep(50 * time.Millisecond) log.Printf("%s bounceback ball[%d]\n", id, ball) ball++ table <- ball } }</code></pre> <p>输出如下:</p> <pre> <code class="language-go">1 got ball[0] 1 bounceback ball[0] 2 got ball[1] 2 bounceback ball[1] 1 got ball[2] 1 bounceback ball[2] 2 got ball[3] 2 bounceback ball[3] 1 got ball[4] 1 bounceback ball[4] 2 got ball[5] 2 bounceback ball[5]</code></pre> <p>代码简洁易懂,很好理解(看不懂的同学请不要拍我)。</p> <p>下面,我们增加一位选手,让3个运动员一块打球</p> <pre> <code class="language-go">go player("2", table) go player("3", table) go player("1", table)</code></pre> <p>这下子热闹了,输出如下:</p> <pre> <code class="language-go">1 got ball[0] 1 bounceback ball[0] 2 got ball[1] 2 bounceback ball[1] 3 got ball[2] 3 bounceback ball[2] 1 got ball[3] 1 bounceback ball[3] 2 got ball[4] 2 bounceback ball[4] 3 got ball[5] 3 bounceback ball[5] 1 got ball[6] 1 bounceback ball[6] 2 got ball[7] 2 bounceback ball[7] 3 got ball[8] 3 bounceback ball[8]</code></pre> <p>看3个人有条不紊的相互击球。此时处女座一定非常满意,但是对于习惯了并发随机性的程序员来说,这实在有些过于美好:为什么它们的顺序如此协调,为什么1总是给2,2给3,3给1,而不是其他顺序呢?</p> <p>划重点了啊:</p> <p>The answer is because Go runtime holds waiting FIFO queue for receivers, that is goroutines ready to receive on the particular channel</p> <p>即,对于接收channel内容的goroutines来说,go的runtime会把它们分配到一个 FIFO队列 中,所以这些goroutines只能按照既定的顺序接收channel的内容,而不会弄乱。所以即使创建上百个palyers,顺序依然是固定的。go实在是太贴心了,有不有!</p> <h2><strong>Fan-In</strong></h2> <p>也叫“扇入”,应该是并发编程里面比较普通的一个模式了。fan-in会从多个管道读取输入,并汇总到一个channel输出,形象的比喻如下图:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/9582e7f80fc3e0593b083b02164cf78f.png"></p> <p>示例代码如下</p> <pre> <code class="language-go">import ( "fmt" "math/rand" "os" "runtime/trace" "time" ) func main() { trace.Start(os.Stderr) c := fanIn(boring(1), boring(2)) for i := 0; i < 10; i++ { fmt.Println(<-c) } fmt.Println("You're both boring; I'm leaving.") trace.Stop() } func fanIn(input1, input2 <-chan int) <-chan int { c := make(chan int) go func() { for {c <- <-input1} }() go func() { for {c <- <-input2} }() return c } func boring(msg int) <-chan int { c := make(chan int) go func() { // We launch the goroutine from inside the function. for i := 0; ; i++ { c <- msg*1000 + i time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond) } }() return c // Return the channel to the caller. }</code></pre> <p>输出为:</p> <p>2000</p> <p>2001</p> <p>1001</p> <p>2002</p> <p>1002</p> <p>2003</p> <p>1003</p> <p>2004</p> <p>1004</p> <p>gotrace输出为(注意这是两次独立的运行结果):</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/17cf159b1b8f1d6f4c42e789a6d12884.png"></p> <p>可以看到,两次的结果都汇入了main线程,并且顺序输出,没有丢失数据,也没有死锁。</p> <p>当然,简单的情况,用select也可以。</p> <p>select设计的目的就是在channel中间通讯,谁的数据先到达,哪个case分支先执行。</p> <pre> <code class="language-go">c1 := boring(1) c2 := boring(2) for i := 0; i < 10; i++ { select { case v := <-c1: fmt.Println(v) case v := <-c2: fmt.Println(v) } }</code></pre> <h2><strong>Workers</strong></h2> <p>也叫FanOut(扇出),和扇入模式相反,工作模式是一个管道分发任务,多个goroutines来执行。</p> <p>示例代码如下:</p> <pre> <code class="language-go">import ( "fmt" "os" "runtime/trace" "sync" "time" ) func worker(ch <-chan int, wg *sync.WaitGroup) { defer wg.Done() for { task, ok := <-ch if !ok { return } time.Sleep(20 * time.Millisecond) fmt.Println("processing task", task) } } func pool(wg *sync.WaitGroup, workers, tasks int) { ch := make(chan int) for i := 0; i < workers; i++ { time.Sleep(1 * time.Millisecond) // spawn出很多worker线程 go worker(ch, wg) } for i := 0; i < tasks; i++ { time.Sleep(10 * time.Millisecond) // 开始分发任务,被激活的workers开始工作了 ch <- i } close(ch) } func main() { trace.Start(os.Stderr) var wg sync.WaitGroup wg.Add(36) go pool(&wg, 36, 36) wg.Wait() trace.Stop() }</code></pre> <p>代码略长,但是逻辑其实非常清晰。我在注释中也稍作了说明。</p> <p>注意( <strong>划重点了</strong> ), close(ch) 在这里很关键,它确定了每个worker退出的节点。当channel中的内容为空,同时它已经被close时, task, ok := <- ch 返回的ok==false,此时通知worker退出,wg标记完成,当所有的worker都完成时,wg.Wait()完成,转入下一行执行。</p> <p>在golang中,main不会自动等待所有子进程完成,如果没有退出检查,main进程会闪退,所有的子进程也会随之强制退出,所以在main里必须有退出检测机制,前几个例子我们使用的是time.Sleep和for循环,这里我们使用了WaitGroup。</p> <p>gotrace结果如下:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/a87fc19e432ec0aa7545dd5de7400bf8.png"></p> <p>圆柱体中心就是main进程中生成的pool进程,围绕它的是36个worker进程。蓝色箭头表示pool每隔10ms分发的任务,它们都被worker处理了。</p> <h2><strong>Servers</strong></h2> <p>server模式和fan_out类似,只不过它的worker线程是按需生成的,并且工作处理完毕后就释放。所以这种模式常应用到网站服务器上。在主进程中,有一个for循环,Accept函数一直阻塞着循环的进行,一旦有新的请求过来,Accept就会生成一个connection,然后主进程就创建一个子进程处理这个connection以及其他逻辑。</p> <p>示例代码如下:</p> <pre> <code class="language-go">import ( "fmt" "net" "os" "runtime/trace" "time" ) func handler(c net.Conn, ch chan int) { ch <- len(c.RemoteAddr().String()) time.Sleep(10 * time.Microsecond) c.Write([]byte("ok")) c.Close() } func logger(ch chan int) { for { time.Sleep(1500 * time.Millisecond) fmt.Println(<-ch) } } func server(l net.Listener, ch chan int) { for { c, err := l.Accept() if err != nil { continue } go handler(c, ch) } } func main() { trace.Start(os.Stderr) l, err := net.Listen("tcp", ":5000") if err != nil { panic(err) } ch := make(chan int) go logger(ch) go server(l, ch) time.Sleep(10 * time.Second) trace.Stop() }</code></pre> <p>可以看到,主进程生成了一个tcp连接,启动了server和logger两个子进程。server用来监听外网的请求,一旦请求过来,就会生成一个handler进程,用来处理connection。同时,handler还会通过管道和logger通讯,logger负责异步记录相应日志。</p> <p>这个程序运行时的输入需要模拟外部请求来产生,为此我写了一个脚本:</p> <pre> <code class="language-go">#!/bin/sh i=0 while [[ $i -lt 20 ]]; do # 通过nc发起tcp请求。每秒请求一次 echo "hello "$i | nc localhost 5000 sleep 1 ((++i)) done</code></pre> <p>运行时,先启动这个脚本,然后启动server或gotrace。</p> <p>gotrace的运行结果如下:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/acbde5548c20a49e2c93c6cf34833cc3.png"></p> <p>可以看到,尽管程序运行了10s,但是只处理了6个请求。这是因为logger占用了管道太长时间,使得handler的运行时间也延长到了1.5s以上。</p> <p>为了解决这个问题,我们正好借助上面介绍的Worker模式,提高logger的并发性。</p> <h3><strong>Server + Worker</strong></h3> <pre> <code class="language-go">import ( "fmt" "net" "os" "runtime/trace" "time" ) func handler(c net.Conn, ch chan int) { ch <- 0 time.Sleep(50 * time.Microsecond) c.Write([]byte("ok")) c.Close() } func logger(wch chan int) { for { fmt.Println(<-wch) // 这里主要耗时 time.Sleep(1500 * time.Millisecond) } } func pool(ch chan int, n int) { wch := make(chan int) for i := 0; i < n; i++ { go logger(wch) } for { wch <- <-ch } } func server(l net.Listener, ch chan int) { for { c, err := l.Accept() if err != nil { continue } go handler(c, ch) } } func main() { trace.Start(os.Stderr) l, err := net.Listen("tcp", ":5000") if err != nil { panic(err) } ch := make(chan int) go pool(ch, 36) go server(l, ch) time.Sleep(10 * time.Second) trace.Stop() }</code></pre> <p>其中pool函数跟上例类似,就是生成(spawn)很多worker,然后handle中生成的数据会先进入pool,由pool再分配给这些workers。</p> <p>3D图如下:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/3eeec7e1af34561789c95558c1170677.png"></p> <p>可以看到,此时server正好处理了10个请求。不再被logger拖延了。</p> <h2><strong>Concurrency & Parallelism</strong></h2> <p>注意我的题目是并发(concurrent)设计模式。那么并发和并行到底啥区别??</p> <ul> <li> <p>Concurrency: A condition that exists when at least two threads are making progress. A more generalized form of parallelism that can include time-slicing as a form of virtual parallelism.</p> <p>Concurrency(并发性):是一种广义的并行。在concurrence的语境下,两个线程/任务可以表面上看起来“像是”并行,但其实机器只有一个核,它们只是分享了时间块。当然,在多核的情况下,它可以是并行的。</p> </li> </ul> <ul> <li> <p>Parallelism(并行性):A condition that arises when at least two threads are executing simultaneously.</p> <p>这个就是狭义的并行,即线程、任务必须是同时进行的,否则不算parallelism。</p> </li> </ul> <p> </p> <p>来自:https://segmentfault.com/a/1190000007111208</p> <p> </p>