Apache Thrift设计概要

jopen 10年前

最近把Apache Thrift 的Java版代码翻了一遍,尝试理解做一个RPC框架所要考虑的方方面面。

网上关于Thrift设计的文章好像不多,于是把自己的笔记整理了一下发上来。

加插招聘广告:唯品会广州总部的基础架构部招人!! 如果你喜欢纯技术的工作,对大型互联网企业的服务化平台有兴趣,愿意在架构的成长期还可以大展拳脚的时候加盟,请电邮 calvin.xiao@vipshop.com

1. Overview

Apache Thrift 的可赞之处是实现了跨超多语言(Java, C++, Go, Python,Ruby, PHP等等)的RPC框架,尽力为每种语言实现了相同抽象的RPC客户端与服务端。

简洁的四层接口抽象,每一层都可以独立的扩展增强或替换,是另一个可赞的地方。

最后,二进制的编解码方式和NIO的底层传输为它提供了不错的性能。

+-------------------------------------------+
| Server |
| (single-threaded, event-driven etc) |
+-------------------------------------------+
| Processor |
| (compiler generated) |
+-------------------------------------------+
| Protocol |
| (JSON, compact etc) |
+-------------------------------------------+
| Transport |
| (raw TCP, HTTP etc) |
+-------------------------------------------+

Transport层提供了一个简单的网络读写抽象层,有阻塞与非阻塞的TCP实现与HTTP的实现。

Protocol层定义了IDL中的数据结构与Transport层的传输数据格式之间的编解码机制。传输格式有二进制,压缩二进制,JSON等格式,IDL中的数据结构则包括Message,Struct,List,Map,Int,String,Bytes等。

Processor层由编译器编译IDL文件生成。
生成的代码会将传输层的数据解码为参数对象(比如商品对象有id与name两个属性,生成的代码会调用Protocol层的readInt与readString方法读出这两个属性值),然后调用由用户所实现的函数,并将结果编码送回。

在服务端, Server层创建并管理上面的三层,同时提供线程的调度管理。而对于NIO的实现,Server层可谓操碎了心。

在客户端, Client层由编译器直接生成,也由上面的三层代码组成。只要语言支持,客户端有同步与异步两种模式。

非死book是Thrift的原作者,还开源有NiftySwift两个子项目, Cassandra是另一个著名用户,其跨语言的Client包就是基于Thrift的,这些在下一篇文章中展开讨论。

 

2. Transport层

2.1 Transport

TTransport除了open/close/flush,最重要的方法是int read(byte[] buf, int off, int len),void write(byte[] buf, int off, int len),读出、写入固定长度的内容。

TSocket使用经典的JDK Blocking IO的Transport实现。1K的BufferedStream, TCP配置项:KeepAlive=true,TCPNoDelay=true,SoLinger=false,SocketTimeout与 ConnectionTimeout可配。

TNonblockingSocket,使用JDK NIO的Transport实现,读写的byte[]会每次被wrap成一个ByteBuffer。TCP配置项类似。
其实这个NonBlockingSocket并没有完全隔离传输层,后面异步Client或NIO的Server还有很多事情要做。

相应的,TServerSocket和TNonblockingServerSocket是ServerTransport的BIO、NIO实现,主要实现侦听端口,Accept后返回TSocket或TNonblockingSocket。其他TCP配置项:ReuseAddress=true,Backlog与SocketTimeout可配。

2.2 WrapperTransport

包裹一个底层的Transport,并利用自己的Buffer进行额外的操作。

1. TFramedTransport,按Frame读写数据。

每Frame的前4字节会记录Frame的长度(少于16M)。读的时候按长度预先将整Frame数据读入Buffer,再从Buffer慢慢读取。写的时候,每次flush将Buffer中的所有数据写成一个Frame。

有长度信息的TFramedTransport是后面NonBlockingServer粘包拆包的基础。

2. TFastFramedTransport 与TFramedTransport相比,始终使用相同的Buffer,提高了内存的使用率。

TFramedTransport的ReadBuffer每次读入Frame时都会创建新的byte[],WriteBuffer每次flush时如果大于初始1K也会重新创建byte[]。

而TFastFramedTransport始终使用相同的ReadBuffer和WriteBuffer,都是1K起步,不够时自动按1.5倍增长,和NIO的ByteBuffer一样,加上limit/pos这样的指针,每次重复使用时设置一下它们。

3. TZlibTransport ,读取时按1K为单位将数据读出并调用JDK的zip函数进行解压再放到Buffer,写入时,在flush时先zip再写入。

4. TSaslClientTransport与TSaslServerTransport, 提供SSL校验。

 

3. Protocol层

3.1 IDL定义

Thrift支持结构见 http://thrift.apache.org/docs/types

* 基本类型: i16,i32,i64, double, boolean,byte,byte[], String。
* 容器类型: List,Set,Map,TList/TSet/TMap类包含其元素的类型与元素的总个数。
* Struct类型,即面向对象的Class,继承于TBase。TStruct类有Name属性,还含有一系列的Field。TField类有自己的Name,类型,顺序id属性。
* Exception类型也是个Struct,继承于TException这个checked exception。
* Enum类型传输时是个i32。
* Message类型封装往返的RPC消息。TMessage类包含Name,类型(请求,返回,异常,ONEWAY)与seqId属性。

相应的,Protocol 层对上述数据结构有read与write的方法。
对基本类型是直接读写,对结构类型则是先调用readXXXBegin(),再调用其子元素的read()方法,再调用readXXXEnd()。
在所有函数中,Protocol层会直接调用Transport层读写特定长度的数据。

3.2 TBinaryProtocol

如前所述,i16,i32, double这些原始类型都是定长的,String,byte[]会在前4个字节说明自己的长度,容器类,Strutct类所对应的 TMap,TStruct,TField,TMessage里有如前所述的属性 (不过Struct与Field里的name属性会被skip),所以其实现实可以简单想象的。

3.3 TCompactProtocol

比起TBinaryProtocol,会想方设法省点再省点。

1. 对整数类型使用了 ZigZag 压缩算法,数值越小压的越多。比如 i32 类型的整数本来是4个字节, 可以压缩成 1~5 字节不等。而 i64类型的整数本来是8个字节,可以压缩成 1~10 字节不等。 因此,值小的i32和i64,和小的Collection和短的String(因为它们都有定义长度的int属性) 越多,它就能省得越多。

2. 它还会尝试将Field的fieldId和type挤在一个byte里写。原本field是short,type也占一个byte,合共三个byte。 它将1个byte拆成两组4bit,前4bit放与前一个field的Id的delta值(不能相差超过15,如果中间太多没持久化的optional field),后4bit放type(目前刚好16种type,把byte[]和String合成一种了)

3.4 TTupleProtocol

继承于TCompactProtocol,Struct的编解码时使用更省地方但IDL间版本不兼容的TupleScheme,见Processor层。

3.5 TJSONProtocol

与我们平时的Restful JSON还是有点区别,具体的定义看cpp实现的TJSONProtool.h文件

对于容器类和Message,会用数组的方式,前几个元素是元信息,后面才是value,比如List是[type,size,[1,2,3]],

对于Struct,会是个Map,Key是fieldId而不是name(为了节省空间?),Value又是一个Map,只有一个Key-Value Pair,Key是type,Value才是真正value。

 

4. Processor层

建议使用一个最简单的IDL文件,用Windows版的Generator生成一个来进行观察。

4.1 基础接口

TBase是大部分生成的Struct,参数类,结果类的接口,最主要是实现从Protocol层读写自己的函数。

TProcessFunction是生成的服务方法类的基类,它的process函数会完成如下步骤:

1. 调用生成的args对象的read方法从protocol层读出自己
2. 调用子类生成的getResult()方法:拆分args对象得到参数,调用真正的用户实现得到结果,并组装成生成的result对象。
3. 写消息头,
4. 调用生成的result对象的write方法将自己写入protocol层
5 调用transport层的flush()。

TBaseProcessor 只有 boolean process(TProtocol in, TProtocol out) 这个简单接口,会先调readMessageBegin(),读出消息名,再从processMap里找出相应的TProcessFunction调用。

TMultiplexedProcessor,支持一个Server支持部署多个Service的情况,在Processor外面再加一层Map,消息名从“add”会变为“Caculator:add”,当然,这需要需要客户端发送时使用 TMultiplexedProtocol修饰原来的Protocol来实现。

4.2 代码生成

1. IFace接口

接口里的函数名不能重名,即使参数不一样也不行。
如果参数是个Struct,则会直接使用继承于TBase的生成类,对客户代码有一定侵入性。
默认抛出TException 这个checked exception,可自定义继承于TException的其他Exception类。

2. Processor类

继承于TBaseProcessor,简单构造出processMap,构造时需要传入用户实现的IFace实现类。

3. 接口里所有方法的方法类

继承于TProcessFunction,见前面TProcessFunction的描述。

4. Struct类,方法参数类和方法结果类

继承于Base。

读取Struct时,遇上fieldId为未知的,或者类型不同于期望类型的field,会被Skip掉。所谓skip,就是只按传过来的type,读取其内容推动数据流往前滚动,但不往field赋值,这也是为什么有了生成的代码,仍然要传输元数据的原因。

为了保持不同服务版本间的兼容性,永远对方法的参数与Struct的field只增不减不改就对了。

因为不同版本间,Struct的filed的数量未知,而StructEnd又无特殊标志,所以在Struct最后会放一个type=Stop的filed,读到则停止Struct的读入。

写入Struct时,Java对象(如String,Struct)或者设为optional的原始类型(如int)会先判断一下这个值被设置没有。Java对象只要判断其是否为null,原始类型就要额外增加一个bitset来记录该field是否已设置,根据field的数量,这个 bitset是byte(8个)或short(16个)。

以上是StandardScheme的行为,每个Struct还会生成一种更节约空间但服务版本间不兼容的TupleScheme,它不再传输 fieldId与type的元数据,只在Struct头写入一个bitset表明哪几个field有值,然后直接用生成的代码读取这些有值的field。所以如果新版idl中filed类型改动将出错;新的field也不会被读取数据流没有往前滚动,接下来也是错。

5. Thrift二进制与Restful JSON的编码效率对比

为了服务版本兼容,Thrift仍然需要传输数字型的fieldId,但比JSON的fieldName省地方。
int与byte[],当然比JSON的数字字符串和BASE64编码字符串省地方。
省掉了’:’和’,'
省掉了””,[], {}, 但作为代价,字符串,byte[],容器们都要有自己的长度定义,Struct也要有Stop Field。
但比起JSON,容器和Field又额外增加了类型定义的元数据。

 

5. 服务端

基类TServer相当于一个容器,拥有生产TProcessor、TTransport、TProtocol的工厂对象。改变这些工厂类,可以修饰包裹Transport与Protocol类,改变TProcessor的单例模式或与Spring集成等等。

https://github.com/m1ch1/mapkeeper/wiki/Thrift-Java-Servers-Compared 这篇文章比较了各种Server

5.1 Blocking Server

TSimpleServer同时只能处理一个Client连接,只是个玩具。

TThreadPoolServer才是典型的多线程处理的Blocking Server实现。
线程池类似 Executors.newCachedThreadPool(),可设最小最大线程数(默认是5与无限)。
每条线程处理一个Client,如果所有线程都在忙,会等待一个random的时间重试直到设定的requestTimeout。
线程对于Client好像没有断开连接的机制,只靠捕获TTransportException来停止服务?

5.2 NonBlockingServer

TThreadedSelectorServer有一条线程专门负责accept,若干条Selector线程处理网络IO,一个Worker线程池处理消息。

THsHaServer只有一条AcceptSelect线程处理关于网络的一切,一个Worker线程池处理消息。

TNonblockingServer只有一条线程处理一切。

很明显TThreadedSelectorServer是被使用得最多的,因为在多核环境下多条Selector线程的表现会更好。所以只对它展开细读。

5.3 TThreadedSelectorServer

TThreadedSelectorServer属于 Half-Sync/Half-Async模式。

AcceptThread线程使用TNonblockingServerTransport执行accept操作,将accept到的Transport round-robin的交给其中一条SelectorThread。
因为SelectorThread自己也要处理IO,所以AcceptThread是先扔给SelectorThread里的Queue(默认长度只有4,满了就要阻塞等待)。

SelectorThread每个循环各执行一次如下动作
1. 注册Transport
2. select()处理IO
3. 处理FrameBuffer的状态变化

注册Transport时,在Selector中注册OP_READ,并创建一个带状态机 (READING_FRAME_SIZE,READING_FRAME,READ_FRAME_COMPLETE, AWAITING_REGISTER_WRITE等)的FrameBuffer类与其绑定。

客户端必须使用FrameTransport(前4个字节记录Frame的长度)来发送数据以解决粘包拆包问题。

SelectorThread在每一轮的select()中,对有数据到达的Transport,其FrameBuffer先读取Frame的长度,然后创建这个长度的ByteBuffer继续读取数据,读满了就交给Worker线程中的Processor处理,没读够就继续下一轮循环。

当交给Processor处理时,Processor不像Blocking Server那样直接和当前的Transport打交道,而是实际将已读取到的Frame数据转存到一个MemoryTransport中,output 时同样只是写到一个由内存中的ByteArray(初始大小为32)打底的OutputStream。

Worker线程池用newFixedThreadPool()创建,Processor会解包,调用用户实现,再把结果编码发送到前面传入的那个 ByteArray中。FrameBuffer再把ByteArray转回ByteBuffer,状态转为 AWAITING_REGISTER_WRITE,并在SelectorThread中注册该变化。

回到SelectorThread中,发现FrameBuffer的当前状态为AWAITING_REGISTER_WRITE,在 Selector中注册OP_WRITE,等待写入的机会。在下一轮循环中就会开始写入数据,写完的话FrameBuffer又转到 READING_FRAME_SIZE的状态,在Selector中重新注册OP_READ。

还有更多的状态机处理,略。

 

6. 客户端

6.1 同步客户端

同样通过生成器生成,其中Client类继承TClient基类实现服务的同步接口。

int add(int num1,int num2)会调用生成的send_add(num1, num2) 与 int receive_add()。
send_add()构造add_args()对象,调用父类的sendBase(“add",args),sendBase()调用protocol的 writeMessage写入消息头,然后调用args自己的write(protocol),然后调用transport的flush()发送。
receive_add()构造add_result,调用父类的receive_base(add_result),receive_base()调用 protocol层的readMessage读出消息头,如果类型是Exception则 用TApplicationException类来读取消息,否则调用result类的read()函数。

注意这里的seq_id并不支持并发访问,在发送时简单的+1,在接收时再进行比较,如果不对则会报错。因此Client好像是非线程安全的。

6.2 异步客户端

异步客户端,需要传入一个CallBack实现,在收到返回结果或错误时调用。

使用NonblockingSocket,每个客户端会起一条线程,在这条线程里忙活所有消息Non blocking发送,接收,编解码及调用Callback类,还有超时调用的处理。
其状态机的写法,TThreadedSelectorServer有相似之处,略。

 

7. Http

走Http协议,主要是利用了Thrift的二进制编解码机制,而放弃了它的底层NIO传输与服务线程模型。

THttpClient,使用Apache HttpClient或JDK的HttpURLConnection Post内容。

TServlet,作为Servlet,将request与response的stream交给Processor处理即可,不用处理线程模型与NIO,很简单。

 

8. Generator

Generator用C++编写,有Parser类分析thrift文件元数据模型,然后每种语言有自己的生成模型。

为Java生成代码的t_java_generator.cc有五千多行,不过很规整很容易看。

除了Java,一般还会生成Html的API描述文档。

谢谢你看到这里,请再看一次招聘广告:

唯品会广州总部的基础架构部招人!! 如果你喜欢纯技术的工作,对大型互联网企业的服务化平台有兴趣,愿意在架构的成长期还可以大展拳脚的时候加盟,请电邮 calvin.xiao@vipshop.com

来自:http://calvin1978.blogcn.com/articles/apache-thrift.html