Java NIO Socket通信
一 套接字通道
1. 阻塞式套接字通道
与Socket和ServerSocket对应,NIO提供了SocketChannel和ServerSocketChannel对应,这两种通道同时支持一般的阻塞模式和更高效的非阻塞模式。
客户端通过SocketChannel.open()方法打开一个Socket通道,如果此时提供了SocketAddress参数,则会自动开始连接,否则需要主动调用connect()方法连接,创建连接后,可以像一般的Channel一样的用Buffer进行读写,这都是阻塞模式的。
服务器端通过ServerSocketChannel.open()创建,并使用bind()方法绑定到一个监听地址上,最后调用accept()方法阻塞等待客户端连接。当客户端连接后会返回一个SocketChannel以实现与客户端的读写交互。
总的来说,阻塞模式即是net包I/O的翻版,只是采用Channel和Buffer实现而已。
2.多路复用套接字通道(Selector实现的非阻塞式IO)
套接字通道多路复用的思想是创建一个Selector,将多个通道对它进行注册,当套接字有关注的事件发生时,可以选出这个通道进行操作。
服务器端的代码如下,相关说明就带在注释里了:
// 创建一个选择器,可用close()关闭,isOpen()表示是否处于打开状态,他不隶属于当前线程 Selector selector = Selector.open(); // 创建ServerSocketChannel,并把它绑定到指定端口上 ServerSocketChannel server = ServerSocketChannel.open(); server.bind(new InetSocketAddress("127.0.0.1", 7777)); // 设置为非阻塞模式, 这个非常重要 server.configureBlocking(false); // 在选择器里面注册关注这个服务器套接字通道的accept事件 // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用于SocketChannel server.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 测试等待事件发生,分为直接返回的selectNow()和阻塞等待的select(),另外也可加一个参数表示阻塞超时 // 停止阻塞的方法有两种: 中断线程和selector.wakeup(),有事件发生时,会自动的wakeup() // 方法返回为select出的事件数(参见后面的注释有说明这个值为什么可能为0). // 另外务必注意一个问题是,当selector被select()阻塞时,其他的线程调用同一个selector的register也会被阻塞到select返回为止 // select操作会把发生关注事件的Key加入到selectionKeys中(只管加不管减) if (selector.select() == 0) { // continue; } // 获取发生了关注时间的Key集合,每个SelectionKey对应了注册的一个通道 Set<SelectionKey> keys = selector.selectedKeys(); // 多说一句selector.keys()返回所有的SelectionKey(包括没有发生事件的) for (SelectionKey key : keys) { // OP_ACCEPT 这个只有ServerSocketChannel才有可能触发 if (key.isAcceptable()) { // 得到与客户端的套接字通道 SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); // 同样设置为非阻塞模式 channel.configureBlocking(false); // 同样将于客户端的通道在selector上注册,OP_READ对应可读事件(对方有写入数据),可以通过key获取关联的选择器 channel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } // OP_READ 有数据可读 if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); // 得到附件,就是上面SocketChannel进行register的时候的第三个参数,可为随意Object ByteBuffer buffer = (ByteBuffer) key.attachment(); // 读数据 这里就简单写一下,实际上应该还是循环读取到读不出来为止的 channel.read(buffer); // 改变自身关注事件,可以用位或操作|组合时间 key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } // OP_WRITE 可写状态 这个状态通常总是触发的,所以只在需要写操作时才进行关注 if (key.isWritable()) { // 写数据掠过,可以自建buffer,也可用附件对象(看情况),注意buffer写入后需要flip // ...... // 写完就吧写状态关注去掉,否则会一直触发写事件 key.interestOps(SelectionKey.OP_READ); } // 由于select操作只管对selectedKeys进行添加,所以key处理后我们需要从里面把key去掉 keys.remove(key); } }这里需要着重说明一下select操作做了什么(根据现象推的,具体好像没有找到这个的文档说明),他每次检查keys里面每个Key对应的通道的状态,如果有关注状态时,就决定返回,这时会同时将Key对象加入到selectedKeys中,并返回selectedKeys本次变化的对象数(原本就在selectedKeys中的对象是不计的),由于一个Key对应一个通道(可能同时处于多个状态,所以注意上面的if语句我都没有写else),所以select返回0也是有可能的。另外OP_WRITE和OP_CONNET这两个状态是不能长期关注的,只在有需要的时候监听,处理完必须马上去掉。如果没有发现有任何关注状态,select会一直阻塞到有状态变化或者超时什么的。
SelectionKey的其他几个方法,attach(Object)为key设置附件,并返回之前的附件;interestOps()和readyOps()返回关注状态和当前状态;cancel()为取消注册;isValid()表示key是否有效(在key取消注册,通道关闭,选择器关闭这三个事情发生之前,key均为有效的,但不包括对方关闭通道,所以读写应注意异常)。
还有一个状态上面没有使用,OP_CONNECT这个主要是用于客户端,对应的key的方法是isConnectable()表示已经创建好了连接。
非阻塞实现的客户端如下:
Selector selector = Selector.open(); // 创建一个套接字通道,注意这里必须使用无参形式 SocketChannel channel = SocketChannel.open(); // 设置为非阻塞模式,这个方法必须在实际连接之前调用(所以open的时候不能提供服务器地址,否则会自动连接) channel.configureBlocking(false); // 连接服务器,由于是非阻塞模式,这个方法会发起连接请求,并直接返回false(阻塞模式是一直等到链接成功并返回是否成功) channel.connect(new InetSocketAddress("127.0.0.1", 7777)); // 注册关联链接状态 channel.register(selector, SelectionKey.OP_CONNECT); while (true) { // 前略 和服务器端的类似 // ... // 获取发生了关注时间的Key集合,每个SelectionKey对应了注册的一个通道 Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { // OP_CONNECT 两种情况,链接成功或失败这个方法都会返回true if (key.isConnectable()) { // 由于非阻塞模式,connect只管发起连接请求,finishConnect()方法会阻塞到链接结束并返回是否成功 // 另外还有一个isConnectionPending()返回的是是否处于正在连接状态(还在三次握手中) if (channel.finishConnect()) { // 链接成功了可以做一些自己的处理,略 // ... // 处理完后必须吧OP_CONNECT关注去掉,改为关注OP_READ key.interestOps(SelectionKey.OP_READ); } } // 后略 和服务器端的类似 // ... } }
虽然例子是这样的,不过服务器和客户端可以自己单方面选择是否采用非阻塞模式,用阻塞模式的客户端连接非阻塞模式的服务器端是OK的。
二 NIO2的异步IO通道
以下API是由Java7提供。老版本无法使用。
异步IO通道的实现有两种实现方式,一是在阻塞模式的原方法(主要指的是read和write,具体可以查看API文档)上传于一个CompletionHandler实例以实现回调,另外也可以令其返回一个Future实例(Java5新增同步工具包java.util.concurrent中的API),然后再适当的时候通过其get方法来获取返回的结果。异步文件I/O通道为AsynchronousFileChannel,而异步套接字通道为AsynchronousServerSocketChannel,分别对应其各自的原始通道。
异步I/O需要与一个AsynchronousChannelGroup对象关联,他实质上就是一个用于I/O的线程池。AsynchronousChannelGroup对象可以通过其自身静态方法的withThreadPool(),withCachedThreadPool(),withFixedThreadPool()提供一个线程池来创建(线程池也是Java5新增同步工具包java.util.concurrent中的API)。在异步通道创建open()时,可将这个对象传入进行关联。如果没有提供这个对象的话,就默认使用系统分组。但是需要注意的是系统分组的线程池是个守护线程池,JVM是可能在没有读写完成前正常结束的。AsynchronousChannelGroup在使用完后需要shutdowm(),这方面和线程池的关闭是类似的。