Java NIO服务器实例

jopen 10年前

我一直想学习如何用Java写一个非阻塞IO服务器,但无法从网上找到一个满足要求的服务器。我找到了这个示例,但仍然没能解决我的问题。还可以选择Apache MINA框架。但我的要求相对简单,MINA对我来说还稍微有点复杂。所以在MINA和一些教程(参见这篇这篇)的帮助下,我自己写了一个非阻塞IO服务器。

我的代码可以从这里下载。这只是个示例代码,如果需要可以随意修改它。这个示例由一个抽象的非阻塞服务器和一个配对的阻塞客户端组成。需要创建一个具体的实现来使用它们——可以通过测试用例来查看这个样例是如何工作的。两者都被设计为在自己的线程中运行(因此实现了Runnable接口),而且是单线程的——后面会有更多的并发选项。当客户端仅连接到单一服务器时是阻塞的,并且仅在自己的线程中运行。客服端还需要等待服务器端的返回信息,所以将客户端设计为非阻塞是没有意义的。本服务器只处理标准的TCP连接。如果使用的是UDP、SSL或别的协议的话,需要自己添加实现。

在写个示例的代码时,我学到了一些东西。除了调用标准的API来打开和管理连接以外,还掌握了selection keys的不同使用方式、消息处理技巧和线程问题,这些都是十分有用的。

打开和管理一个连接的基本方式在网络上十分常用,而且在下面的示例代码段中也有出现(只有代码片段——可以从代码下载中取得完整版本)。从打开一个 Selector 开始(一种网络信道多路复用器 multiplexor)。Selector通过selectionkey来表示每一个信道,然后打开一个指定端口的套接字节服务器。将selector、SelectionKey.OP_ACCEPT作为参数在socket服务器上注册,任何接入连接在selector上都是有效的。下面的代码一直在循环等待selector的事件。当事件发生时,如果是一个连接请求,套 字节服务器会接受连接并注册链接发出的消息(通过OP_READ 注册)。如果它是一个信息(key.isreadable()),处理信息的代码尚未实现。下面的代码也很脆弱,任何错误都会导致服务器停止工作。

Selector selector = null;  ServerSocketChannel server = null;  try {    selector = Selector.open();    server = ServerSocketChannel.open();    server.socket().bind(new InetSocketAddress(port));    server.configureBlocking(false);    server.register(selector, SelectionKey.OP_ACCEPT);    while (true) {    selector.select();    for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {      SelectionKey key = i.next();      i.remove();      if (key.isConnectable()) {       ((SocketChannel)key.channel()).finishConnect();      }      if (key.isAcceptable()) {       // accept connection       SocketChannel client = server.accept();       client.configureBlocking(false);       client.socket().setTcpNoDelay(true);       client.register(selector, SelectionKey.OP_READ);     }      if (key.isReadable()) {       // ...read messages...     }     }   }       } catch (Throwable e) {    throw new RuntimeException("Server failure: "+e.getMessage());  } finally {   try {    selector.close();    server.socket().close();    server.close();    stopped();   } catch (Exception e) {    // do nothing - server failed   }  }

值得注意的是,一个selection key不代表一个套接字。相反,他们是selector注册的信道。因此,一个来自客户端的连接事件(OP_ACCEPT 事件)将使用与客户端发送消息(OP_READ事件)不同的key通知。这意味着,来自同一个客户端不同类型的事件将会用不同的key。不要试图对这些key进行比较。这样做的好处是,不同的事件 可以用不同的selector注册(这样做的原因是线程的——下面会详细说明)。

当读取一条信息时,有很多的情况需要考虑。当读取连接的结果数据时,这个信息可能是不完整的(剩余的数据要晚些才能获得),也可能包含不止一条消息。因此, 必须考虑消息结尾是如何表示的。读取数据时要将数据放入缓冲和然后拆分为有效的信息。标识消息结尾通常有以下几种方式:

  1. 固定的消息大小。
  2. 将消息的长度作为消息的前缀。
  3. 用一个特殊的符号来标识消息的结束。

我的代码使用了第二种方式。每种方式都会以2个字节开始,用来存储消息体的字节数(因此消息长度被限制为65535字节以内)。因为数据也是使用ByteBuffers来读取的,所以了解一下如何使用它们会很有帮助(可以出这里的API链接入手)。下面的代码会读取数据并将结果传给readmessage方法。在readMessage方法中这些数据被拆分成独立的消息。请注意readbuffer的用法。默认缓冲区应尽可小,但也不要设置过小。这样会造成消息大小经常大于缓冲区。缓冲区越小,处理的速度就越快。但是,如果接收到的消息大小超过缓冲区,那么必须重新缓冲区设置来处理消息。

private List<ByteBuffer> readIncomingMessage(SelectionKey key) throws IOException {    ByteBuffer readBuffer = readBuffers.get(key);    if (readBuffer==null) {    readBuffer = ByteBuffer.allocate(defaultBufferSize);     readBuffers.put(key, readBuffer);    }   if (((ReadableByteChannel)key.channel()).read(readBuffer)==-1) {    throw new IOException("Read on closed key");   }     readBuffer.flip();    List<ByteBuffer> result = new ArrayList<ByteBuffer>();     ByteBuffer msg = readMessage(key, readBuffer);   while (msg!=null) {    result.add(msg);    msg = readMessage(key, readBuffer);   }      return result;  }

下面的代码用来将缓存数据转化为消息。

private ByteBuffer readMessage(SelectionKey key, ByteBuffer readBuffer) {   int bytesToRead;    if (readBuffer.remaining()>messageLength.byteLength()) { // must have at least enough bytes to read the size of the message      byte[] lengthBytes = new byte[messageLength.byteLength()];    readBuffer.get(lengthBytes);    bytesToRead = (int)messageLength.bytesToLength(lengthBytes);    if ((readBuffer.limit()-readBuffer.position())<bytesToRead) {      // Not enough data - prepare for writing again      if (readBuffer.limit()==readBuffer.capacity()) {         // message may be longer than buffer => resize buffer to message size      int oldCapacity = readBuffer.capacity();      ByteBuffer tmp = ByteBuffer.allocate(bytesToRead+messageLength.byteLength());      readBuffer.position(0);      tmp.put(readBuffer);      readBuffer = tmp;             readBuffer.position(oldCapacity);          readBuffer.limit(readBuffer.capacity());       readBuffers.put(key, readBuffer);          return null;        } else {         // rest for writing         readBuffer.position(readBuffer.limit());          readBuffer.limit(readBuffer.capacity());          return null;         }    }    } else {     // Not enough data - prepare for writing again     readBuffer.position(readBuffer.limit());     readBuffer.limit(readBuffer.capacity());     return null;    }    byte[] resultMessage = new byte[bytesToRead];   readBuffer.get(resultMessage, 0, bytesToRead);    // remove read message from buffer   int remaining = readBuffer.remaining();   readBuffer.limit(readBuffer.capacity());   readBuffer.compact();   readBuffer.position(0);   readBuffer.limit(remaining);   return ByteBuffer.wrap(resultMessage);  }

示例中的代码是单线程的——所有的连接都是由同一个线程处理。也可以使用多线程。尽管在某一时刻只有一个线程可以工作(也就是说,不可能有2个线程都在在执行读操作),但是读写操作可以由不同的线程通过独立的key来完成。同样的,在某一时刻只有一个线程可以使用selector。虽然单线程代码就能满足我的需要,但是有很多的方法可以并发处理。下面我分别描述使用线程池数据读事件、使用单一selector和线程处理OP_ACCEPT事件。

  1. 用一个selector来对应多个客户端连接。收到accept事件时,会创建一个新的selector并在这个新的selector上注册读事件。新创建的selector用来监听和处理读事件,这个任务是在线程池中执行的。由于不能确定selector对资源占用的影响,所以不知道这种做法的扩展性如何。
  2. 每个线程都启用一个selector,在创建执行线程时通过负载均衡的方式分配一个selector。将客户端分配给对应的selector,每个线程都在自己的selector中处理读事件,这是MINA的处理方式。这样处理问题是如何均衡线程的处理(MINA使用了轮叫round-robin调度算法)——如果不小心,结果会导致是有的线程非常繁忙有的线程处于空闲状态。
  3. 所有的事件都在同一个selector上处理,同步时需要小心处理。当传递key给某一个线程准备读取时,要保证这个key没有正准备被其他的线程所读取,直到当前的操作结束。
    在我想到最好的解决方式之前,selector处理的工作会非常繁重。

我会将如何处理并发这个问题留给感兴趣的读者。祝读者们在编码过程中一切顺利,我的例子可以在这里下载。

2011年12月22日更新:有读者来信指出来原始的测试用例中有bug,有些测试用例中使用的是将字节转换为字节流 InputStreamReader。如果使用了非8位的字符集,那么测试用户将由于消息长度而失败(发生在转意消息头部时),我已更新了示例中的测试用例修正该问题。

原文链接: cordinc 翻译: ImportNew.com - 一直在路上
译文链接: http://www.importnew.com/13602.html