Java NIO Socket通信

jopen 12年前

一 套接字通道

1. 阻塞式套接字通道

    与Socket和ServerSocket对应,NIO提供了SocketChannel和ServerSocketChannel对应,这两种通道同时支持一般的阻塞模式和更高效的非阻塞模式。

    客户端通过SocketChannel.open()方法打开一个Socket通道,如果此时提供了SocketAddress参数,则会自动开始连接,否则需要主动调用connect()方法连接,创建连接后,可以像一般的Channel一样的用Buffer进行读写,这都是阻塞模式的。

    服务器端通过ServerSocketChannel.open()创建,并使用bind()方法绑定到一个监听地址上,最后调用accept()方法阻塞等待客户端连接。当客户端连接后会返回一个SocketChannel以实现与客户端的读写交互。

    总的来说,阻塞模式即是net包I/O的翻版,只是采用Channel和Buffer实现而已。

2.多路复用套接字通道(Selector实现的非阻塞式IO)

    套接字通道多路复用的思想是创建一个Selector,将多个通道对它进行注册,当套接字有关注的事件发生时,可以选出这个通道进行操作。

    服务器端的代码如下,相关说明就带在注释里了:

// 创建一个选择器,可用close()关闭,isOpen()表示是否处于打开状态,他不隶属于当前线程  Selector selector = Selector.open();  // 创建ServerSocketChannel,并把它绑定到指定端口上  ServerSocketChannel server = ServerSocketChannel.open();  server.bind(new InetSocketAddress("127.0.0.1", 7777));  // 设置为非阻塞模式, 这个非常重要  server.configureBlocking(false);  // 在选择器里面注册关注这个服务器套接字通道的accept事件  // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用于SocketChannel  server.register(selector, SelectionKey.OP_ACCEPT);  while (true) {   // 测试等待事件发生,分为直接返回的selectNow()和阻塞等待的select(),另外也可加一个参数表示阻塞超时   // 停止阻塞的方法有两种: 中断线程和selector.wakeup(),有事件发生时,会自动的wakeup()   // 方法返回为select出的事件数(参见后面的注释有说明这个值为什么可能为0).   // 另外务必注意一个问题是,当selector被select()阻塞时,其他的线程调用同一个selector的register也会被阻塞到select返回为止   // select操作会把发生关注事件的Key加入到selectionKeys中(只管加不管减)   if (selector.select() == 0) { //    continue;   }     // 获取发生了关注时间的Key集合,每个SelectionKey对应了注册的一个通道   Set<SelectionKey> keys = selector.selectedKeys();   // 多说一句selector.keys()返回所有的SelectionKey(包括没有发生事件的)   for (SelectionKey key : keys) {    // OP_ACCEPT 这个只有ServerSocketChannel才有可能触发    if (key.isAcceptable()) {     // 得到与客户端的套接字通道     SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();     // 同样设置为非阻塞模式     channel.configureBlocking(false);     // 同样将于客户端的通道在selector上注册,OP_READ对应可读事件(对方有写入数据),可以通过key获取关联的选择器     channel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024));    }    // OP_READ 有数据可读    if (key.isReadable()) {     SocketChannel channel = (SocketChannel) key.channel();     // 得到附件,就是上面SocketChannel进行register的时候的第三个参数,可为随意Object     ByteBuffer buffer = (ByteBuffer) key.attachment();     // 读数据 这里就简单写一下,实际上应该还是循环读取到读不出来为止的     channel.read(buffer);     // 改变自身关注事件,可以用位或操作|组合时间     key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);    }    // OP_WRITE 可写状态 这个状态通常总是触发的,所以只在需要写操作时才进行关注    if (key.isWritable()) {     // 写数据掠过,可以自建buffer,也可用附件对象(看情况),注意buffer写入后需要flip     // ......     // 写完就吧写状态关注去掉,否则会一直触发写事件     key.interestOps(SelectionKey.OP_READ);    }      // 由于select操作只管对selectedKeys进行添加,所以key处理后我们需要从里面把key去掉    keys.remove(key);   }  }

这里需要着重说明一下select操作做了什么(根据现象推的,具体好像没有找到这个的文档说明),他每次检查keys里面每个Key对应的通道的状态,如果有关注状态时,就决定返回,这时会同时将Key对象加入到selectedKeys中,并返回selectedKeys本次变化的对象数(原本就在selectedKeys中的对象是不计的),由于一个Key对应一个通道(可能同时处于多个状态,所以注意上面的if语句我都没有写else),所以select返回0也是有可能的。另外OP_WRITE和OP_CONNET这两个状态是不能长期关注的,只在有需要的时候监听,处理完必须马上去掉。如果没有发现有任何关注状态,select会一直阻塞到有状态变化或者超时什么的。

SelectionKey的其他几个方法,attach(Object)为key设置附件,并返回之前的附件;interestOps()和readyOps()返回关注状态和当前状态;cancel()为取消注册;isValid()表示key是否有效(在key取消注册,通道关闭,选择器关闭这三个事情发生之前,key均为有效的,但不包括对方关闭通道,所以读写应注意异常)。

    还有一个状态上面没有使用,OP_CONNECT这个主要是用于客户端,对应的key的方法是isConnectable()表示已经创建好了连接。

非阻塞实现的客户端如下:

Selector selector = Selector.open();  // 创建一个套接字通道,注意这里必须使用无参形式  SocketChannel channel = SocketChannel.open();  // 设置为非阻塞模式,这个方法必须在实际连接之前调用(所以open的时候不能提供服务器地址,否则会自动连接)  channel.configureBlocking(false);  // 连接服务器,由于是非阻塞模式,这个方法会发起连接请求,并直接返回false(阻塞模式是一直等到链接成功并返回是否成功)  channel.connect(new InetSocketAddress("127.0.0.1", 7777));  // 注册关联链接状态  channel.register(selector, SelectionKey.OP_CONNECT);  while (true) {   // 前略 和服务器端的类似   // ...   // 获取发生了关注时间的Key集合,每个SelectionKey对应了注册的一个通道   Set<SelectionKey> keys = selector.selectedKeys();   for (SelectionKey key : keys) {    // OP_CONNECT 两种情况,链接成功或失败这个方法都会返回true    if (key.isConnectable()) {     // 由于非阻塞模式,connect只管发起连接请求,finishConnect()方法会阻塞到链接结束并返回是否成功     // 另外还有一个isConnectionPending()返回的是是否处于正在连接状态(还在三次握手中)     if (channel.finishConnect()) {      // 链接成功了可以做一些自己的处理,略      // ...      // 处理完后必须吧OP_CONNECT关注去掉,改为关注OP_READ      key.interestOps(SelectionKey.OP_READ);     }    }    // 后略 和服务器端的类似    // ...   }  }

    虽然例子是这样的,不过服务器和客户端可以自己单方面选择是否采用非阻塞模式,用阻塞模式的客户端连接非阻塞模式的服务器端是OK的。

二 NIO2的异步IO通道

以下API是由Java7提供。老版本无法使用。

    异步IO通道的实现有两种实现方式,一是在阻塞模式的原方法(主要指的是read和write,具体可以查看API文档)上传于一个CompletionHandler实例以实现回调,另外也可以令其返回一个Future实例(Java5新增同步工具包java.util.concurrent中的API),然后再适当的时候通过其get方法来获取返回的结果。异步文件I/O通道为AsynchronousFileChannel,而异步套接字通道为AsynchronousServerSocketChannel,分别对应其各自的原始通道。

    异步I/O需要与一个AsynchronousChannelGroup对象关联,他实质上就是一个用于I/O的线程池。AsynchronousChannelGroup对象可以通过其自身静态方法的withThreadPool(),withCachedThreadPool(),withFixedThreadPool()提供一个线程池来创建(线程池也是Java5新增同步工具包java.util.concurrent中的API)。在异步通道创建open()时,可将这个对象传入进行关联。如果没有提供这个对象的话,就默认使用系统分组。但是需要注意的是系统分组的线程池是个守护线程池,JVM是可能在没有读写完成前正常结束的。AsynchronousChannelGroup在使用完后需要shutdowm(),这方面和线程池的关闭是类似的。