Go 语言中漂亮的并发特性
时不时地学习一门新的编程语言对你来说是大有裨益的,哪怕这门语言并不那么成功甚至有些过时。用新的编程语言来解决老的编程问题会迫使你对自己的某些观点、方法甚至是习惯进行重新思考。
我喜欢尝试新的东西,尤其是编程语言。但是如果只编写了“Hellow World"或者实现了Fibonacci数列,你通常会对这门语言毫无感觉、甚至觉得索然寡味。你应该去试着实现Eratosthenes筛法,尝试一些数据结构或者感受一下它的性能表现。但是我想要的是更加现实的、甚至能够为以后复用的代码。所以不久前我为自己出了个题目,来帮助我只通过几百行代码就可以体会到新的编程语言的感觉。
这个项目涉及一个语言中的几个基本元素:字符串,文件,网络I/O,当然还有并发性。这个项目叫做TCP/IP代理(或者你可以叫做网络调试器)。它的理念是,你拥有一个TCP/IP监听器(单线程或多线程)监听一个指定的端口并接受外来的连接。当他接受到一个接入请求,它就会建立一个连接,并且在远程客户端与服务器之间做双向数据传输。另外,这个代理可以记录日志,以不同的格式来记录,以便日后做分析。
当我需要这个工具的时候,我不再需要到处去找。每次涉及网络编程的时候,这样一个工具是必须的。我已经使用过不同的语言来实现这个工具,包括C, C++, Perl, PHP。最近的两个实现是使用python 和 Erlang。它代表着我正在寻找的答案。
我们可以再具体说说我们的需求。这个应用必须支持同时建立多个连接。对于每一个连接,它需要通过3种途径记录数据:一个以十六进制顺序记录来自于双向传输的数据日志,两个用于分开记录进和出的数据流的二进制日志。
我们在这篇文章中将实现这个程序,我们使用的语言是Go。Go的作者声称,Go骨子里就渗透着并发和多线程特性。我想把它带到我们的世界。
如果我通过C++来实现,我可能就需要main监听线程,和每个连接的线程。所以,单独一个连接就可以通过一个线程,而得到完整的服务(I/O和日志记录)。
以下是我在Go实现中用于服务每个连接的线程:
- 一个双向十六进制日志记录线程
- 两个以二进制记录进和出的数据流的线程
- 两个用于在服务器和远程主机间传输数据的线程
总共5个线程。
5个线程在为一个独立的连接服务。我实现了这些线程,不是为了多线程本身,而是因为Go鼓励多线程,而C++不鼓励(及时最新的C++x11标准也类似)。多线程在Go中是如此自然而简单。我在Go语言中实现TCP/IP代理,没有使用锁和条件变量。同步由Go的channel方式进行优雅的管理。
好吧,这里有源代码,包含解释。如果你不熟悉Go,注释会有帮助。我的本意不仅关注于这个程序的功能,也关注Go语言本身。
现在开始
从 2-11 行我们声明了一些要用到的包。值得注意的是,如果引入的包没有用到,Go 视之为一个错误并且强制删除没用的声明(在C++项目中,你最后完成了,记得什么时候清理过 STL 的头文件吗?)
package main import ( "encoding/hex" "flag" "fmt" "net" "os" "runtime" "strings" "time" )从 12-16行我们声明了一些代表命令行标志的全局变量。后面,我们会看到如何解析他们。
var ( host *string = flag.String("host", "","target host or address") port *string = flag.String("port", "0", "target port") listen_port *string = flag.String("listen_port", "0","listen port") )
从 17-20 行我们看到Go语言中可变参数函数的语法结构。
func die(format string, v ...interface{}) { os.Stderr.WriteString(fmt.Sprintf(format+"\n", v...)) os.Exit(1) } |
从 21-28 行有两个函数分别启动十六进制数据日志和二进制流日志。他们唯一的区别是日志名称不同。
func connection_logger(data chan []byte, conn_n int, local_info, remote_info string) { log_name := fmt.Sprintf("log-%s-%04d-%s-%s.log",format_time(time.Now()), conn_n, local_info, remote_info) logger_loop(data, log_name) } func binary_logger(data chan []byte, conn_n int, peer string) { log_name := fmt.Sprintf("log-binary-%s-%04d-%s.log", format_time(time.Now()), conn_n, peer) logger_loop(data, log_name) } |
在第 29-43 行,你能看到 Go 真正的乐趣。函数 logger_loop 创建一个日志文件,然无限循环的写入(35-42行)。在第36行,代码等待来自通道 data 的消息。在第34行有一个很很有趣的技巧。操作符 defer 允许我们定义一段在函数功能结束时执行的代码(类似Java中的 finally)。如果接受到空数据,函数将退出。
func logger_loop(data chan []byte, log_name string) { f, err := os.Create(log_name) if err != nil { die("Unable to create file %s, %v\n", log_name, err) } defer f.Close() for { b := <-data if len(b) == 0 { break } f.Write(b) f.Sync() } } func format_time(t time.Time) string { return t.Format("2006.01.02-15.04.05") } func printable_addr(a net.Addr) string { return strings.Replace(a.String(), ":", "-", -1) } type Channel struct { from, to net.Conn logger, binary_logger chan []byte ack chan bool }
在第55-88行中有一个函数用来从源套接字读取数据并写入到日志中,最后将其发送到目的套接字。对于每个连接,有两个pass_through函数的实例,在相反方向的本地和远程之间复制数据。当一个I / O错误发生时,它被处理成连接的断开。最后,在第79行此函数中目标套接字发送确认信号到主线程,信号终止。
func pass_through(c *Channel){ from_peer := printable_addr(c.from.LocalAddr()) to_peer := printable_addr(c.to.LocalAddr()) b := make([]byte, 10240) offset := 0 packet_n := 0 for { n, err := c.from.Read(b) if err != nil { c.logger <- []byte(fmt.Sprintf("Disconnected from %s\n",from_peer)) break } if n > 0 { c.logger <- []byte(fmt.Sprintf("Received (#%d, %08X)%d bytes from %s\n",packet_n, offset, n, from_peer)) c.logger <- []byte(hex.Dump(b[:n])) c.binary_logger <- b[:n] c.to.Write(b[:n]) c.logger <- []byte(fmt.Sprintf("Sent (#%d) to %s\n",packet_n, to_peer)) offset += n packet_n += 1 } } c.from.Close() c.to.Close() c.ack <- true }
在81-107行有一个负责处理实际连接的函数。这个函数连接远程的socket(第82行),对连接计时(第88行,第101-103行),开启日志(第93-95行),最后启动两个传输数据的线程(第97-98行)。只要两个连接都还可用,pass_through就会一直执行下去。在第99-100行我们等待数据传输线程返回的确认信息。在第104-106行我们关闭日志。
func process_connection(local net.Conn, conn_n int, target string) { remote, err := net.Dial("tcp", target) if err != nil { fmt.Printf("Unable to connect to %s, %v\n", target, err) } local_info := printable_addr(remote.LocalAddr()) remote_info := printable_addr(remote.RemoteAddr()) started := time.Now() logger := make(chan []byte) from_logger := make(chan []byte) to_logger := make(chan []byte) ack := make(chan bool) go connection_logger(logger, conn_n, local_info, remote_info) go binary_logger(from_logger, conn_n, local_info) go binary_logger(to_logger, conn_n, remote_info) logger <- []byte(fmt.Sprintf("Connected to %s at %s\n", target, format_time(started))) go pass_through(&Channel{remote, local, logger, to_logger, ack}) go pass_through(&Channel{local, remote, logger, from_logger, ack}) <-ack // Make sure that the both copiers gracefully finish. <-ack // finished := time.Now() duration := finished.Sub(started) logger <- []byte(fmt.Sprintf("Finished at %s, duration %s\n", format_time(started), duration.String())) logger <- []byte{} // Stop logger from_logger <- []byte{} // Stop "from" binary logger to_logger <- []byte{} // Stop "to" binary logger }
在第108-132行是运行TCP / IP侦听的主要入口函数。在第109行,我们要求Go运行时使用所有的物理可用CPU。
func main() { runtime.GOMAXPROCS(runtime.NumCPU()) flag.Parse() if flag.NFlag() != 3 { fmt.Printf("usage: gotcpspy -host target_host -port target_port -listen_post=local_port\n") flag.PrintDefaults() os.Exit(1) } target := net.JoinHostPort(*host, *port) fmt.Printf("Start listening on port %s and forwarding data to %s\n",*listen_port, target) ln, err := net.Listen("tcp", ":"+*listen_port) if err != nil { fmt.Printf("Unable to start listener, %v\n", err) os.Exit(1) } conn_n := 1 for { if conn, err := ln.Accept(); err ==nil { go process_connection(conn, conn_n, target) conn_n += 1 } else { fmt.Printf("Accept failed, %v\n", err) } } }
这个程序只有132行。请注意:我们只使用了标准库。
现在,我们已经准备好运行:
go run gotcpspy.go -host pop.yandex.ru -port 110 -local_port 8080它应该会打印出:
Start listening on port 8080 and forwarding data to pop.yandex.ru:110然后你就可以在另一个窗口中运行:
telnet localhost 8080接下来输入,例如用户 test[ENTER]键和密码(空)[ENTER] 。三个日志文件即将被创建(当然在每人 不同 情况下 生成的时间戳不同 )。
双向十六进制日志 log-2012.04.20-19.55.17-0001-192.168.1.41 -49544-213.180.204.37-110.log:
Connected to pop.yandex.ru:110 at 2012.04.20-19.55.17 Received (#0, 00000000) 38 bytes from 192.168.1.41-49544 00000000 2b 4f 4b 20 50 4f 50 20 59 61 21 20 76 31 2e 30 |+OK POP Ya! v1.0| 00000010 2e 30 6e 61 40 32 36 20 48 74 6a 4a 69 74 63 50 |.0na@26 HtjJitcP| 00000020 52 75 51 31 0d 0a |RuQ1..| Sent (#0) to [--1]-8080 Received (#0, 00000000) 11 bytes from [--1]-8080 00000000 55 53 45 52 20 74 65 73 74 0d 0a |USER test..| Sent (#0) to 192.168.1.41-49544 Received (#1, 00000026) 23 bytes from 192.168.1.41-49544 00000000 2b 4f 4b 20 70 61 73 73 77 6f 72 64 2c 20 70 6c |+OK password, pl| 00000010 65 61 73 65 2e 0d 0a |ease...| Sent (#1) to [--1]-8080 Received (#1, 0000000B) 11 bytes from [--1]-8080 00000000 50 41 53 53 20 6e 6f 6e 65 0d 0a |PASS none..| Sent (#1) to 192.168.1.41-49544 Received (#2, 0000003D) 72 bytes from 192.168.1.41-49544 00000000 2d 45 52 52 20 5b 41 55 54 48 5d 20 6c 6f 67 69 |-ERR [AUTH] logi| 00000010 6e 20 66 61 69 6c 75 72 65 20 6f 72 20 50 4f 50 |n failure or POP| 00000020 33 20 64 69 73 61 62 6c 65 64 2c 20 74 72 79 20 |3 disabled, try | 00000030 6c 61 74 65 72 2e 20 73 63 3d 48 74 6a 4a 69 74 |later. sc=HtjJit| 00000040 63 50 52 75 51 31 0d 0a |cPRuQ1..| Sent (#2) to [--1]-8080 Disconnected from 192.168.1.41-49544 Disconnected from [--1]-8080 Finished at 2012.04.20-19.55.17, duration 5.253979s
传出数据二进制日志 log-binary-2012.04.20-19.55.17-0001 -192.168.1.41-49544.log:
USER test PASS none
传入数据二进制日志 log-binary-2012.04.20-19.55.17 -0001-213.180.204.37-110.log:
+OK POP Ya! v1.0.0na@26 HtjJitcPRuQ1 +OK password, please. -ERR [AUTH] login failure or POP3 disabled, try later. sc=HtjJitcPRuQ1
看起来有用,现在我们试试下载更大的二进制文件来测试性能,先直接下载,再通过代理。
直接下载(文件大小约72MB):
time wget http://www.erlang.org/download/otp_src_R15B01.tar.gz ... Saving to: `otp_src_R15B01.tar.gz' ... real 1m2.819s
现在,试试通过代理下载:
go run gotcpspy.go -host=www.erlang.org -port=80 -listen_port=8080
下载:
time wget http://localhost:8080/download/otp_src_R15B01.tar.gz ... Saving to: `otp_src_R15B01.tar.gz.1' ... real 0m56.209s
比较一下结果:
diff otp_src_R15B01.tar.gz otp_src_R15B01.tar.gz.1
两者匹配,程序运行正确。
现在来看看性能,我在我的 Mac Air 上将这个实验重复了几次。惊讶的是,对我来说,通过代理下载居然比直接下载还要快一些。在上面的实验中:1m2819s (直接) VS. 0m.56209s (代理)。我能想到的唯一解释就是 wget是单线程的,它在一个线程内复用输入和输出流。反过来,代理以独立线程处理每一个流,可能因此速度稍稍快一些。此中差异甚微,几乎不能察觉,或许在另一台电脑或另一个网路中,这点差异就会完全消失。主要的观察是,尽管通过代理下载会额外产生庞大的日志开销,下载速度并不会因此减慢。
综上所述,我希望你从简单和清晰的角度来看这个程序。我在上面已经指出,但我想再次强调:我已经开始逐渐在这个应用程序中使用线程(goroutine)。问题的本质只需我在一个正在运行的连接中标记并发任务,然后利用Go的易用性和安全性的并发机制已经很好地实现了它,而且我 最终不用在效率与复杂的并发(和调试的难度)顾此失彼。有时候同意一个简单的问题只需输入比特和字节, 你唯一关心的是代码的线性效率。但你在并发,多线程处理能力中遇到的逐渐增多的问题将成为关键因素,而对于这种应用,Go的魅力即将闪耀。
我希望作为一个有代表性的例子来炫耀Go的 方便甚至并发美。