Go 语言中漂亮的并发特性

jopen 12年前

时不时地学习一门新的编程语言对你来说是大有裨益的,哪怕这门语言并不那么成功甚至有些过时。用新的编程语言来解决老的编程问题会迫使你对自己的某些观点、方法甚至是习惯进行重新思考。

我喜欢尝试新的东西,尤其是编程语言。但是如果只编写了“Hellow World"或者实现了Fibonacci数列,你通常会对这门语言毫无感觉、甚至觉得索然寡味。你应该去试着实现Eratosthenes筛法,尝试一些数据结构或者感受一下它的性能表现。但是我想要的是更加现实的、甚至能够为以后复用的代码。所以不久前我为自己出了个题目,来帮助我只通过几百行代码就可以体会到新的编程语言的感觉。

这个项目涉及一个语言中的几个基本元素:字符串,文件,网络I/O,当然还有并发性。这个项目叫做TCP/IP代理(或者你可以叫做网络调试器)。它的理念是,你拥有一个TCP/IP监听器(单线程或多线程)监听一个指定的端口并接受外来的连接。当他接受到一个接入请求,它就会建立一个连接,并且在远程客户端与服务器之间做双向数据传输。另外,这个代理可以记录日志,以不同的格式来记录,以便日后做分析。

当我需要这个工具的时候,我不再需要到处去找。每次涉及网络编程的时候,这样一个工具是必须的。我已经使用过不同的语言来实现这个工具,包括C, C++, Perl, PHP。最近的两个实现是使用python 和 Erlang。它代表着我正在寻找的答案。

我们可以再具体说说我们的需求。这个应用必须支持同时建立多个连接。对于每一个连接,它需要通过3种途径记录数据:一个以十六进制顺序记录来自于双向传输的数据日志,两个用于分开记录进和出的数据流的二进制日志。

我们在这篇文章中将实现这个程序,我们使用的语言是Go。Go的作者声称,Go骨子里就渗透着并发和多线程特性。我想把它带到我们的世界。

如果我通过C++来实现,我可能就需要main监听线程,和每个连接的线程。所以,单独一个连接就可以通过一个线程,而得到完整的服务(I/O和日志记录)。

以下是我在Go实现中用于服务每个连接的线程:

  • 一个双向十六进制日志记录线程
  • 两个以二进制记录进和出的数据流的线程
  • 两个用于在服务器和远程主机间传输数据的线程

总共5个线程。

5个线程在为一个独立的连接服务。我实现了这些线程,不是为了多线程本身,而是因为Go鼓励多线程,而C++不鼓励(及时最新的C++x11标准也类似)。多线程在Go中是如此自然而简单。我在Go语言中实现TCP/IP代理,没有使用锁和条件变量。同步由Go的channel方式进行优雅的管理。

好吧,这里有源代码,包含解释。如果你不熟悉Go,注释会有帮助。我的本意不仅关注于这个程序的功能,也关注Go语言本身。

现在开始

从 2-11 行我们声明了一些要用到的包。值得注意的是,如果引入的包没有用到,Go 视之为一个错误并且强制删除没用的声明(在C++项目中,你最后完成了,记得什么时候清理过 STL 的头文件吗?)

package main    import (      "encoding/hex"      "flag"      "fmt"      "net"      "os"      "runtime"      "strings"      "time"  )
从 12-16行我们声明了一些代表命令行标志的全局变量。后面,我们会看到如何解析他们。
var (      host *string = flag.String("host", "","target host or address")      port *string = flag.String("port", "0", "target port")      listen_port *string = flag.String("listen_port", "0","listen port")  )

从 17-20 行我们看到Go语言中可变参数函数的语法结构。

 
func die(format string, v ...interface{}) {       os.Stderr.WriteString(fmt.Sprintf(format+"\n", v...))       os.Exit(1)  }

从 21-28 行有两个函数分别启动十六进制数据日志和二进制流日志。他们唯一的区别是日志名称不同。

 
func connection_logger(data chan []byte, conn_n int,    local_info, remote_info string) {      log_name := fmt.Sprintf("log-%s-%04d-%s-%s.log",format_time(time.Now()), conn_n, local_info, remote_info)      logger_loop(data, log_name)  }  func binary_logger(data chan []byte, conn_n int, peer string) {      log_name := fmt.Sprintf("log-binary-%s-%04d-%s.log",    format_time(time.Now()), conn_n, peer)      logger_loop(data, log_name)  }

在第 29-43 行,你能看到 Go 真正的乐趣。函数 logger_loop 创建一个日志文件,然无限循环的写入(35-42行)。在第36行,代码等待来自通道 data 的消息。在第34行有一个很很有趣的技巧。操作符 defer 允许我们定义一段在函数功能结束时执行的代码(类似Java中的 finally)。如果接受到空数据,函数将退出。

func logger_loop(data chan []byte, log_name string) {      f, err := os.Create(log_name)      if err != nil {          die("Unable to create file %s, %v\n", log_name, err)      }      defer f.Close()      for {          b := <-data          if len(b) == 0 {               break          }          f.Write(b)          f.Sync()      }  }  func format_time(t time.Time) string {      return t.Format("2006.01.02-15.04.05")  }  func printable_addr(a net.Addr) string {      return strings.Replace(a.String(), ":", "-", -1)  }  type Channel struct {      from, to net.Conn      logger, binary_logger chan []byte      ack chan bool  }

在第55-88行中有一个函数用来从源套接字读取数据并写入到日志中,最后将其发送到目的套接字。对于每个连接,有两个pass_through函数的实例,在相反方向的本地和远程之间复制数据。当一个I / O错误发生时,它被处理成连接的断开。最后,在第79行此函数中目标套接字发送确认信号到主线程,信号终止。

func pass_through(c *Channel){       from_peer := printable_addr(c.from.LocalAddr())      to_peer := printable_addr(c.to.LocalAddr())          b := make([]byte, 10240)      offset := 0      packet_n := 0      for {          n, err := c.from.Read(b)          if err != nil {              c.logger <- []byte(fmt.Sprintf("Disconnected from %s\n",from_peer))              break          }          if n > 0 {              c.logger <- []byte(fmt.Sprintf("Received (#%d, %08X)%d bytes from %s\n",packet_n, offset, n, from_peer))              c.logger <- []byte(hex.Dump(b[:n]))              c.binary_logger <- b[:n]              c.to.Write(b[:n])              c.logger <- []byte(fmt.Sprintf("Sent (#%d) to %s\n",packet_n, to_peer))              offset += n              packet_n += 1          }      }      c.from.Close()      c.to.Close()      c.ack <- true  }

在81-107行有一个负责处理实际连接的函数。这个函数连接远程的socket(第82行),对连接计时(第88行,第101-103行),开启日志(第93-95行),最后启动两个传输数据的线程(第97-98行)。只要两个连接都还可用,pass_through就会一直执行下去。在第99-100行我们等待数据传输线程返回的确认信息。在第104-106行我们关闭日志。

func process_connection(local net.Conn, conn_n int, target string) {      remote, err := net.Dial("tcp", target)      if err != nil {          fmt.Printf("Unable to connect to %s, %v\n", target, err)      }            local_info := printable_addr(remote.LocalAddr())      remote_info := printable_addr(remote.RemoteAddr())            started := time.Now()            logger := make(chan []byte)      from_logger := make(chan []byte)      to_logger := make(chan []byte)      ack := make(chan bool)            go connection_logger(logger, conn_n, local_info, remote_info)      go binary_logger(from_logger, conn_n, local_info)      go binary_logger(to_logger, conn_n, remote_info)            logger <- []byte(fmt.Sprintf("Connected to %s at %s\n",      target, format_time(started)))            go pass_through(&Channel{remote, local, logger, to_logger, ack})      go pass_through(&Channel{local, remote, logger, from_logger, ack})      <-ack // Make sure that the both copiers gracefully finish.      <-ack //            finished := time.Now()      duration := finished.Sub(started)      logger <- []byte(fmt.Sprintf("Finished at %s, duration %s\n",      format_time(started), duration.String()))            logger <- []byte{} // Stop logger      from_logger <- []byte{} // Stop "from" binary logger      to_logger <- []byte{} // Stop "to" binary logger  }

在第108-132行是运行TCP / IP侦听的主要入口函数。在第109行,我们要求Go运行时使用所有的物理可用CPU。

func main() {      runtime.GOMAXPROCS(runtime.NumCPU())      flag.Parse()      if flag.NFlag() != 3 {          fmt.Printf("usage: gotcpspy -host target_host -port target_port -listen_post=local_port\n")          flag.PrintDefaults()          os.Exit(1)      }      target := net.JoinHostPort(*host, *port)      fmt.Printf("Start listening on port %s and forwarding data to %s\n",*listen_port, target)      ln, err := net.Listen("tcp", ":"+*listen_port)      if err != nil {          fmt.Printf("Unable to start listener, %v\n", err)          os.Exit(1)      }      conn_n := 1      for {          if conn, err := ln.Accept(); err ==nil {              go process_connection(conn, conn_n, target)              conn_n += 1          } else {         fmt.Printf("Accept failed, %v\n", err)        }      }  }

这个程序只有132行。请注意:我们只使用了标准库。
现在,我们已经准备好运行:

go run gotcpspy.go -host pop.yandex.ru -port 110 -local_port 8080
它应该会打印出:
Start listening on port 8080 and forwarding data to pop.yandex.ru:110
然后你就可以在另一个窗口中运行:
telnet localhost 8080
接下来输入,例如用户 test[ENTER]键和密码(空)[ENTER] 。三个日志文件即将被创建(当然在每人 不同 情况下 生成的时间戳不同 )。

双向十六进制日志 log-2012.04.20-19.55.17-0001-192.168.1.41 -49544-213.180.204.37-110.log:

Connected to pop.yandex.ru:110 at 2012.04.20-19.55.17  Received (#0, 00000000) 38 bytes from 192.168.1.41-49544  00000000 2b 4f 4b 20 50 4f 50 20 59 61 21 20 76 31 2e 30  |+OK POP Ya! v1.0|  00000010 2e 30 6e 61 40 32 36 20 48 74 6a 4a 69 74 63 50  |.0na@26 HtjJitcP|  00000020 52 75 51 31 0d 0a  |RuQ1..|  Sent (#0) to [--1]-8080  Received (#0, 00000000) 11 bytes from [--1]-8080  00000000 55 53 45 52 20 74 65 73 74 0d 0a  |USER test..|  Sent (#0) to 192.168.1.41-49544  Received (#1, 00000026) 23 bytes from 192.168.1.41-49544  00000000 2b 4f 4b 20 70 61 73 73 77 6f 72 64 2c 20 70 6c  |+OK password, pl|  00000010 65 61 73 65 2e 0d 0a  |ease...|  Sent (#1) to [--1]-8080  Received (#1, 0000000B) 11 bytes from [--1]-8080  00000000 50 41 53 53 20 6e 6f 6e 65 0d 0a  |PASS none..|  Sent (#1) to 192.168.1.41-49544  Received (#2, 0000003D) 72 bytes from 192.168.1.41-49544  00000000 2d 45 52 52 20 5b 41 55 54 48 5d 20 6c 6f 67 69  |-ERR [AUTH] logi|  00000010 6e 20 66 61 69 6c 75 72 65 20 6f 72 20 50 4f 50  |n failure or POP|  00000020 33 20 64 69 73 61 62 6c 65 64 2c 20 74 72 79 20  |3 disabled, try |  00000030 6c 61 74 65 72 2e 20 73 63 3d 48 74 6a 4a 69 74  |later. sc=HtjJit|  00000040 63 50 52 75 51 31 0d 0a  |cPRuQ1..|  Sent (#2) to [--1]-8080  Disconnected from 192.168.1.41-49544  Disconnected from [--1]-8080  Finished at 2012.04.20-19.55.17, duration 5.253979s

传出数据二进制日志 log-binary-2012.04.20-19.55.17-0001 -192.168.1.41-49544.log:

USER test  PASS none

传入数据二进制日志 log-binary-2012.04.20-19.55.17 -0001-213.180.204.37-110.log:

+OK POP Ya! v1.0.0na@26 HtjJitcPRuQ1  +OK password, please.    -ERR [AUTH] login failure or POP3 disabled, try later. sc=HtjJitcPRuQ1

看起来有用,现在我们试试下载更大的二进制文件来测试性能,先直接下载,再通过代理。

直接下载(文件大小约72MB):

time wget http://www.erlang.org/download/otp_src_R15B01.tar.gz     ...    Saving to: `otp_src_R15B01.tar.gz'     ...    real 1m2.819s

现在,试试通过代理下载:

go run gotcpspy.go -host=www.erlang.org -port=80 -listen_port=8080

下载:

time wget http://localhost:8080/download/otp_src_R15B01.tar.gz    ...    Saving to: `otp_src_R15B01.tar.gz.1'    ...    real 0m56.209s

比较一下结果:

diff otp_src_R15B01.tar.gz otp_src_R15B01.tar.gz.1

两者匹配,程序运行正确。

现在来看看性能,我在我的 Mac Air 上将这个实验重复了几次。惊讶的是,对我来说,通过代理下载居然比直接下载还要快一些。在上面的实验中:1m2819s (直接) VS. 0m.56209s (代理)。我能想到的唯一解释就是 wget是单线程的,它在一个线程内复用输入和输出流。反过来,代理以独立线程处理每一个流,可能因此速度稍稍快一些。此中差异甚微,几乎不能察觉,或许在另一台电脑或另一个网路中,这点差异就会完全消失。主要的观察是,尽管通过代理下载会额外产生庞大的日志开销,下载速度并不会因此减慢。

综上所述,我希望你从简单和清晰的角度来看这个程序。我在上面已经指出,但我想再次强调:我已经开始逐渐在这个应用程序中使用线程(goroutine)。问题的本质只需我在一个正在运行的连接中标记并发任务,然后利用Go的易用性和安全性的并发机制已经很好地实现了它,而且我 最终不用在效率与复杂的并发(和调试的难度)顾此失彼。

有时候同意一个简单的问题只需输入比特和字节, 你唯一关心的是代码的线性效率。但你在并发,多线程处理能力中遇到的逐渐增多的问题将成为关键因素,而对于这种应用,Go的魅力即将闪耀。

我希望作为一个有代表性的例子来炫耀Go的 方便甚至并发美。