基于NIO的消息路由的实现(一) 前言

fg68 9年前

一、前言:

已经很久没有碰编码了,大概有9年的时间,日新月异的框架和新东西让我眼花缭乱。之前一直在做web相关的应用。由于项目不大,分布式开发在我编码的那个年代里没有做过,后来走上管理岗位才接触到,仅限于沟通交流和方案的策划,并没有真正的做过。如今我有了一点时间和精力,决定自己学习一下,先从简单的消息通讯开始吧。

好,背景完毕!下面说说我想做的东西,我想做一个基于NIO的消息路由,而并不基于目前已有的各种优秀框架(mina,netty等等),这么做的初衷也许跟我个人的习惯有关,我总是觉得如果不明白原理,即使再好的框架当遭遇问题的时候,我也会无从下手,如果我懂得了原理,再选用其他的框架,也会更得心应手。所以才没有使用现今那些优秀的框架,或许是我的一点点偏见吧。

我的代码已经发布在 http://git.oschina.net/java616

目已经完成根据客户端的标识进行消息的异步转发,仍会持续的迭代和增加。有兴趣的可以下载回去,如果我有做的不好或者不对的地方,敬请指出。

二、一些概念和例程

NIO是啥我就不说了,我们来看一下我理解的NIO工作流程,如图:

基于NIO的消息路由的实现(一) 前言

上图为我所理解的NIO的工作过程,如果存在问题,请批评斧正。概括一下我的理解:

  • SocketChannel:为NIO工作过程中,数据传输的通道,客户端与服务端的每次交互都是通过此通道进行的;

  • Selector(多路复用器):会监控其注册的通道上面的任何事件,获得SelectionKey,事件分为OP_ACCEPT,OP_CONNECT,OP_WRITE,OP_READ(这是SelectionKey的四个属性),OP_ACCEPT应该为服务端接收到客户端连接时的一种状态,我在客户端并没有用到此状态;OP_CONNECT则为客户端已经连接上服务端的一种状态,我在服务端并没有使用这个状态;

  • Buffer:我的应用中,我一直使用ByteBuffer,此类是整个NIO通讯的关键,必须理解才能进行通讯的开发,否则可能产生问题;所有的通讯内容都需要在此类中写入和读出;


如果想做nio相关的应用,那么一些概念上的东西是不可回避的,在这里推荐:http://www.iteye.com/magazines/132-Java-NIO 。

下面三段代码,分别完成了服务的创建、服务对事件的监听以及客户端对事件的监听(不可直接拷贝使用,有一些变量没有声明,如有兴趣,可以去下载我的源码)。

  • 服务的创建

//打开一个serversocket通道,ServerSocketChannel是一个监控是否有新连接进入的通道。  serverSocketChannel = ServerSocketChannel.open();  //将这个serversokect通道设置为非阻塞模式  serverSocketChannel.configureBlocking(false);  //绑定serversokect的ip和端口  serverSocketChannel.socket().bind(new InetSocketAddress(cfg.getIp(), cfg.getPort()));  //打开选择器  selector = Selector.open();  //将此通道注册给选择器selector  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  • 服务对事件的监听

                //监听事件key                  selector.select(2000);                  //迭代一组事件key                  Iterator<SelectionKey> keys = selector.selectedKeys().iterator();                  while (keys.hasNext()) {                      //定义一个socket通道                      SocketChannel socketChannel = null;                        int count = 0;                        SelectionKey key = keys.next();                      //  Logs.info("有网络事件被触发,事件类型为:" + key.interestOps());                      //删除Iterator中的当前key,避免重复处理                      keys.remove();                      if (!key.isValid()) {                          continue;                      } else if (key.isAcceptable()) {                          //从客户端送来的key中获取ServerSocket通道                          serverSocketChannel = (ServerSocketChannel) key.channel();                          //接收此ServerSocket通道中的Socket通道,accept是一个阻塞方法,一直到获取到连接才会继续                          socketChannel = serverSocketChannel.accept();                          //将此socket通道设置为非阻塞模式                          socketChannel.configureBlocking(false);                          //将此通道注册到selector,并等待接收客户端的读入数据                          socketChannel.register(selector, SelectionKey.OP_READ);                          allocToken(socketChannel);                        } else if (key.isReadable()) {                            //获取事件key中的channel                          socketChannel = (SocketChannel) key.channel();                          ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock());                          //清理缓冲区,便于使用                          byteBuffer.clear();                          //将channel中的字节流读入缓冲区                          count = socketChannel.read(byteBuffer);                          byteBuffer.flip();                          //处理粘包                          if (count > 0) {                              try {                                  handlePacket(socketChannel, byteBuffer);                              } catch (Exception e) {                                  e.printStackTrace();  //                                continue;//如果当前包存在非法抛出异常,那么不再进行处理直接跳出循环,处理下一个包;此处存疑,测试阶段暂时注释                              }                          } else if (count == 0) {                              continue;                          } else {                              socketChannel.close();                            }                        } else if (key.isWritable()) {                          ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ);                      }                  }

  • 客户端对事件的监听

            while (true) {                  try {                        selector.select(3000);                        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();                      for (int i = 0; keys.hasNext(); i++) {                            SelectionKey key = keys.next();                          keys.remove();                          if (key.isConnectable()) {                              socketChannel = (SocketChannel) key.channel();                              if (socketChannel.isConnectionPending()) {                                  if (socketChannel.finishConnect()){                                      Client.IS_CONNECT =true;                                      logger.info("-------成功连接服务端!-------");                                  }                                }                              socketChannel.register(selector, SelectionKey.OP_READ);                          } else if (key.isReadable()) {                              //获取事件key中的channel                              socketChannel = (SocketChannel) key.channel();                              ByteBuffer byteBuffer = ByteBuffer.allocate(BLOCK);                              //清理缓冲区,便于使用                              byteBuffer.clear();                              //将channel中的字节流读入缓冲区                              String readStr = "";                              int count = socketChannel.read(byteBuffer);                              //务必要把buffer的position重置为0                              byteBuffer.flip();                                handlePacket(byteBuffer, count);  //                            socketChannel.register(selector, SelectionKey.OP_READ);                          } else if (key.isWritable()) {                              socketChannel.register(selector, SelectionKey.OP_READ);                          }                        }                  } catch (IOException e) {                      e.printStackTrace();                      continue;                  }                }

三、我要做的是个啥?

根据我个人对NIO的理解,我的初步想法是要实现一个这样的东西,如图:

基于NIO的消息路由的实现(一) 前言

但在我的不断深入开发中,发现上面的图中很多不成熟的内容,作为一个完整的消息通讯的服务,必须包含如下的内容:

1、对接入连接的管理;

2、对连接身份的确认;

3、对异常关闭连接的回收;

4、根据身份对消息的转发;

5、链路的维持;

6、自动重连;

7、消息的异步处理;

8、消息的响应机制;

9、粘包和断包的处理;

9、配置体系;

10、通讯层与业务层的分离;

………………

网上很多的NIO实例都是可以运行的,但并不能满足我的工作需要,以上的那些肯定还有没有考虑全的东西,随着我一点点的开发会逐渐的浮出水面。

在未来的文章中,我会逐步把我自己制定的通讯协议,各个模块的结构,以及代码贴出来,希望大家能够互相学习,互相帮助。(待续)

来自:http://my.oschina.net/u/2397619/blog/493486