Mina实现自定义协议的通信

jopen 12年前

网络的传输使用需要遵循一定的规则,这些规则我们称为协议。如在互联网请求HTML页面的时候,我们要遵循HTTP协议,HTTP头的格式就是我们要遵守的规则:

Request Headers  Accept:  text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8  Accept-Charset:  GBK,utf-8;q=0.7,*;q=0.3  Accept-Encoding:  gzip,deflate,sdch  Accept-Language:  zh-CN,zh;q=0.8  Cache-Control:  max-age=0

比如我们在做一些第三方开发的时候,经常会按照固定的格式向服务器端发送请求,服务器验证消息后,返回相应的结果。在Mina的开发中,我们也会用到这种模式。首先我们先来描述下这种方式的使用情形:

这样的方式,目前,我所在的项目组中主要用于通信和传输。为了将文件从客户端传到云端,我们先在客户端将数据根据一定的规则切片,然后通过一种特定的格式传输到服务器端。Mina,作为这中间的桥梁,秉承了框架很好的优点,快速开发。由于我们的服务器端采用的是C写的,所以我只给出客户端的编码过程,解码过程原理和开发都一样。

 用Mina执行一定协议的传输,主要有以下几个步骤:

1.         设计通信协议;

2.         编写请求(返回)对象和业务对象;

3.         编写解码(编码)器;

4.         编写客户端、服务器端。

Mina实现自定义协议的通信

下面根据代码,一步步介绍这个过程,由于是前提的测试代码,所以没有考虑线程安全等问题,只是一个思路:

首先是通信的协议,根据mina的构建规则,将协议设计成抽象类,由请求对象和返回对象分别去继承,协议格式如下:

package com.a2.desktop.example5.mina.potocol;      import org.apache.mina.core.buffer.IoBuffer;    /**   * 通信协议   * @author Chen.Hui   *    */  public abstract class AbsMessage {      /**    * 协议格式:    *     * tag | header length | Filename | File length | offset | checksum | temps | data    *     */     /** 请求或访问类型 请求Tag:0x00 返回Tag:0x01 共 8 bit */   public abstract byte getTag();     /** 头文件长度 共 2^16 可表示 65535 */   public abstract short getHeaderlen();     /** 根据UUID生成文件唯一标识,共 8*36=288 bit */   public abstract byte[] getFilename();//需要設計一個算法     /** 获取文件长度 2^32=4GB 共 32 bit */   public abstract int getFileLen();     /** 获取文件的偏移量offset 共 32 bit */   public abstract int getOffset();     /** 获取文件的MD5校验码 共 32 bit */   public abstract byte[] getChecksum();     /** 预留字段 长度不超过 128 bit */   public abstract byte[] getTmp();      /**data 方式传输内容 不超过1024bit*/   public abstract IoBuffer getData();    }

下面是请求对象(返回)和业务对象,这里的业务对象主要功能是将文件切片:

package com.a2.desktop.example5.mina.potocol;    import java.io.IOException;  import java.nio.charset.Charset;    import org.apache.mina.core.buffer.IoBuffer;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;    /**   * 请求对象   *    * @author Chen.Hui   *    */  public class InfoRequest extends AbsMessage {     Logger logger = LoggerFactory.getLogger(InfoRequest.class);     FilePiece piece;     Charset charset;     public InfoRequest(FilePiece piece) {    this.piece = piece;   }      public InfoRequest(){    //empty   }     @Override   public byte getTag() {// 0x01 请求包    return (byte) 0x01;   }     @Override   public short getHeaderlen() {    if (getTmp() == null) {     short len = (short) (1 + 2 + 36 + 4 + 4 + 4 );     return len;    } else {     short len = (short) (1 + 2 + 36 + 4 + 4 + 4 + (short) getTmp().length);     return len;    }   }     @Override   public int getFileLen() {// 文件总长度      try {     return (int) piece.getFc().size();      } catch (IOException e) {     e.printStackTrace();    }    return 0;   }     @Override   public int getOffset() {// 传输 偏移量      return piece.getOffset();     }     @Override   public byte[] getFilename() {// 文件名称      /** check the bits of name */    byte[] name = new byte[36];    name = piece.getFilename().getBytes();      return name;     }     @Override   public byte[] getChecksum() {// checksum      byte[] checksum = new byte[4];    checksum = piece.getChecksum().getBytes();     return checksum;   }     @Override   public byte[] getTmp() {    byte[] b=new byte[5];    return b;   }     @Override   public IoBuffer getData() {    return piece.getBuf();   }  }
业务对象代码,RandomAccessFile可用于随机读写,用于文件的切片,这里还用了管道,主要目的是为了加开读写速度:



package com.a2.desktop.example5.mina.potocol;    import java.io.File;  import java.io.RandomAccessFile;  import java.nio.ByteBuffer;  import java.nio.channels.FileChannel;  import java.util.UUID;    import org.apache.mina.core.buffer.IoBuffer;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;    /**   * 分片文件操作类   *    * @author Chen.Hui   *    */  public class FilePiece {     Logger logger = LoggerFactory.getLogger(FilePiece.class);     private ByteBuffer[] dsts;      private IoBuffer buf;     private String filename;     private FileChannel fc;     private RandomAccessFile raf;     private int offset;     private String checksum;     /** 构建文件的基本信息 */   public FilePiece(String path, int offset) throws Exception {      raf = new RandomAccessFile(new File(path), "rw");    fc = raf.getChannel();      this.offset = offset;      dsts = new ByteBuffer[1024];      for (int i = 0; i < dsts.length; i++) {     dsts[i] = ByteBuffer.allocate(1024);    }      fc.read(dsts, offset, 1024);            buf=IoBuffer.allocate(1024);      filename = UUID.randomUUID().toString();    logger.info("has built:" + filename + " filename size"      + filename.length());     }   /**这个方法还有点儿问题,数据取的不对*/   public IoBuffer getBuf(){    dsts[0].flip();    while(dsts[0].hasRemaining()){     buf.putChar(dsts[0].getChar());    }    buf.flip();    return buf;   }     public String getFilename() {    return filename;   }     public FileChannel getFc() {    return fc;   }     public RandomAccessFile getRaf() {    return raf;   }     public int getOffset() {    return offset;   }     public String getChecksum() {    // TODO checksum algorithems    return "aaaa";   }  }
再接下来是编码器 ,编码器的作用就是讲数据装换成用于传输的流,在Mina中这种流就是IoBuffer
package com.a2.desktop.example5.mina.potocol;    import java.nio.charset.Charset;    import org.apache.mina.core.buffer.IoBuffer;  import org.apache.mina.core.session.IoSession;  import org.apache.mina.filter.codec.ProtocolEncoderOutput;  import org.apache.mina.filter.codec.demux.MessageEncoder;    /**   * 编码器   * @author Chen.Hui   *   */  public class InfoEncoder implements MessageEncoder{     private Charset charset;      public InfoEncoder(Charset charset){    this.charset=charset;   }      @Override   public void encode(IoSession session, AbsMessage message,     ProtocolEncoderOutput out) throws Exception {        IoBuffer buf=IoBuffer.allocate(1024).setAutoExpand(true);        if(message instanceof InfoRequest){          InfoRequest req=(InfoRequest) message;     buf.put(req.getTag());     buf.putShort((short)req.getHeaderlen());     buf.put(req.getFilename());     buf.putInt(req.getFileLen());     buf.putInt(req.getOffset());     buf.put(req.getChecksum());     buf.put(req.getTmp());     buf.put(req.getData());         }else if(message instanceof InfoResponse){     //TODO    }          buf.flip();        out.write(buf);   }  }

解码器与之类似,解码器在这里的作用主要用户服务器端解码:

package com.a2.desktop.example5.mina.potocol;    import java.nio.charset.Charset;    import org.apache.mina.core.buffer.IoBuffer;  import org.apache.mina.core.session.IoSession;  import org.apache.mina.filter.codec.ProtocolDecoderOutput;  import org.apache.mina.filter.codec.demux.MessageDecoder;  import org.apache.mina.filter.codec.demux.MessageDecoderResult;  /**   * 解码器   * @author ChenHui   *   */  public class InfoDecoder implements MessageDecoder {     private Charset charset;     public InfoDecoder(Charset charset) {    this.charset = charset;   }     @Override   public MessageDecoderResult decodable(IoSession session, IoBuffer in) {      //System.out.println("package size:"+in.remaining());    // 报头长度<56    if (in.remaining() < 56) {     return MessageDecoderResult.NEED_DATA;    }      byte tag = in.get();    short head_len=in.getShort();      if (tag == (short) 0x01) {     System.out.println("请求标识符:"+tag+" head length:"+head_len);    }else{     //System.out.println("未知标识符...");     return MessageDecoderResult.NOT_OK;    }      return MessageDecoderResult.OK;   }     @Override   public MessageDecoderResult decode(IoSession session, IoBuffer in,     ProtocolDecoderOutput out) throws Exception {    byte tag=in.get();       if(tag==0x01){     InfoReqContainer irc=new InfoReqContainer();     irc.setTag(tag);     irc.setHeadlen(in.getShort());     irc.setFilename(in.getString(36, charset.newDecoder()));     irc.setFilelen(in.getInt());     irc.setOffset(in.getInt());     irc.setChecksum(in.getString(4, charset.newDecoder()));     irc.setTemp(in.getString(5, charset.newDecoder()));//应该用head len-53     irc.setData(in);          out.write(irc);    }         return MessageDecoderResult.OK;   }     @Override   public void finishDecode(IoSession session, ProtocolDecoderOutput out)     throws Exception {    // TODO Auto-generated method stub     }    }
为了解码方便,我这里设计了一个辅助类InfoReqContainer,主要用户辅助操作:

package com.a2.desktop.example5.mina.potocol;    import org.apache.mina.core.buffer.IoBuffer;    /**   * 请求对象解析类   *    * @author Chen.Hui   *    */  public class InfoReqContainer {   private byte tag;   private short headlen;   private String filename;   private int filelen;   private int offset;   private String temp;   private String checksum;   private IoBuffer data;     public byte getTag() {    return tag;   }     public void setTag(byte tag) {    this.tag = tag;   }     public short getHeadlen() {    return headlen;   }     public void setHeadlen(short headlen) {    this.headlen = headlen;   }     public String getFilename() {    return filename;   }     public void setFilename(String filename) {    this.filename = filename;   }     public int getFilelen() {    return filelen;   }     public void setFilelen(int filelen) {    this.filelen = filelen;   }     public int getOffset() {    return offset;   }     public void setOffset(int offset) {    this.offset = offset;   }     public String getTemp() {    return temp;   }     public void setTemp(String temp) {    this.temp = temp;   }     public String getChecksum() {    return checksum;   }     public void setChecksum(String checksum) {    this.checksum = checksum;   }     public IoBuffer getData() {    return data;   }     public void setData(IoBuffer data) {    this.data = data;   }    }

mina中,要将解码器和编码器绑定到协议工厂类中,才能被过滤器使用:

package com.a2.desktop.example5.mina.potocol;    import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;  import org.apache.mina.filter.codec.demux.MessageDecoder;  import org.apache.mina.filter.codec.demux.MessageEncoder;    public class InfoCodecFactory extends DemuxingProtocolCodecFactory {   private MessageDecoder decoder;     private MessageEncoder encoder;     public InfoCodecFactory(MessageDecoder decoder,     MessageEncoder encoder) {    this.decoder = decoder;    this.encoder = encoder;    addMessageDecoder(this.decoder);    addMessageEncoder(AbsMessage.class, this.encoder);   }  }

做完了这些,编码解码的工作都完成了,最后就是写客户端和服务器端进行测试,注意文件位置,这里没有做提示:

package com.a2.desktop.example5.mina.potocol;    import java.net.InetSocketAddress;  import java.nio.charset.Charset;    import org.apache.mina.core.future.ConnectFuture;  import org.apache.mina.core.service.IoConnector;  import org.apache.mina.core.session.IoSession;  import org.apache.mina.filter.codec.ProtocolCodecFilter;  import org.apache.mina.transport.socket.nio.NioSocketConnector;    public class TestClient {      private static String HOST = "127.0.0.1";     private static int PORT = 8082;     public static void main(String[] args) {    // 创建一个非阻塞的客户端程序    IoConnector connector = new NioSocketConnector();    // 设置链接超时时间    connector.setConnectTimeout(30000);    // 添加过滤器    connector.getFilterChain().addLast(      "codec",      new ProtocolCodecFilter(new InfoCodecFactory(        new InfoDecoder(Charset.forName("utf-8")),        new InfoEncoder(Charset.forName("utf-8")))));    // 添加业务逻辑处理器类    connector.setHandler(new ClientHandler());    IoSession session = null;    try {     ConnectFuture future = connector.connect(new InetSocketAddress(       HOST, PORT));// 创建连接     future.awaitUninterruptibly();// 等待连接创建完成     session = future.getSession();// 获得session       FilePiece piece = new FilePiece(       "D:\\Develop Libs Tar\\apache-mina-2.0.7-bin.zip", 0);       InfoRequest ir = new InfoRequest(piece);           session.write(ir);// 发送消息              } catch (Exception e) {  e.printStackTrace();     System.out.println("客户端链接异常...");    }      session.getCloseFuture().awaitUninterruptibly();// 等待连接断开    connector.dispose();   }    }

客户端的Handler没有做任何处理:

package com.a2.desktop.example5.mina.potocol;    import org.apache.mina.core.service.IoHandler;  import org.apache.mina.core.session.IdleStatus;  import org.apache.mina.core.session.IoSession;    public class ClientHandler implements IoHandler {     @Override   public void sessionCreated(IoSession session) throws Exception {    // TODO Auto-generated method stub     }     @Override   public void sessionOpened(IoSession session) throws Exception {    // TODO Auto-generated method stub     }     @Override   public void sessionClosed(IoSession session) throws Exception {    // TODO Auto-generated method stub     }     @Override   public void sessionIdle(IoSession session, IdleStatus status)     throws Exception {    // TODO Auto-generated method stub     }     @Override   public void exceptionCaught(IoSession session, Throwable cause)     throws Exception {    // TODO Auto-generated method stub     }     @Override   public void messageReceived(IoSession session, Object message)     throws Exception {    //System.out.println(message.toString());   }     @Override   public void messageSent(IoSession session, Object message) throws Exception {    // TODO Auto-generated method stub     }    }
服务器端:
package com.a2.desktop.example5.mina.potocol;    import java.net.InetSocketAddress;  import java.nio.charset.Charset;    import org.apache.mina.core.service.IoAcceptor;  import org.apache.mina.core.session.IdleStatus;  import org.apache.mina.core.session.IoSessionConfig;  import org.apache.mina.filter.codec.ProtocolCodecFilter;  import org.apache.mina.filter.logging.LogLevel;  import org.apache.mina.filter.logging.LoggingFilter;  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;        public class TestServer{     private static int PORT = 8082;     public static void main(String[] args) {    IoAcceptor acceptor = null;    try {     // 创建一个非阻塞的server端的Socket     acceptor = new NioSocketAcceptor();       // 设置过滤器(添加自带的编解码器)     acceptor.getFilterChain().addLast(       "codec",       new ProtocolCodecFilter(new InfoCodecFactory(         new InfoDecoder(Charset.forName("utf-8")),         new InfoEncoder(Charset.forName("utf-8")))));     // 设置日志过滤器     LoggingFilter lf = new LoggingFilter();     lf.setMessageReceivedLogLevel(LogLevel.DEBUG);     acceptor.getFilterChain().addLast("logger", lf);     // 获得IoSessionConfig对象     IoSessionConfig cfg = acceptor.getSessionConfig();     // 读写通道10秒内无操作进入空闲状态     cfg.setIdleTime(IdleStatus.BOTH_IDLE, 100);       // 绑定逻辑处理器     acceptor.setHandler(new ServerHandler());     // 绑定端口     acceptor.bind(new InetSocketAddress(PORT));     System.out.println("成功开启服务器端...");         } catch (Exception e) {          e.printStackTrace();    }   }  }        

服务器端的Handler

package com.a2.desktop.example5.mina.potocol;    import org.apache.mina.core.service.IoHandler;  import org.apache.mina.core.session.IdleStatus;  import org.apache.mina.core.session.IoSession;    public class ServerHandler implements IoHandler {     @Override   public void sessionCreated(IoSession session) throws Exception {    // TODO Auto-generated method stub     }     @Override   public void sessionOpened(IoSession session) throws Exception {    // TODO Auto-generated method stub     }     @Override   public void sessionClosed(IoSession session) throws Exception {    // TODO Auto-generated method stub     }     @Override   public void sessionIdle(IoSession session, IdleStatus status)     throws Exception {    // TODO Auto-generated method stub     }     @Override   public void exceptionCaught(IoSession session, Throwable cause)     throws Exception {    // TODO Auto-generated method stub     }     @Override   public void messageReceived(IoSession session, Object message)     throws Exception {    if (message instanceof InfoReqContainer) {     InfoReqContainer irc = (InfoReqContainer) message;     System.out.println("服务器端 获取成功 Tag:" + irc.getTag() + "\r\n " + "head len:"       + irc.getHeadlen() + "\r\nfilename: " + irc.getFilename()       + "\r\nfile len:"+irc.getFilelen()+"\r\noffset:"+irc.getOffset()+"\r\nchecksum:"+irc.getChecksum()+"\r\ndata:"+irc.getData().toString());          session.write("success rescive");         } else {     System.out.println("获取失败");    }     }     @Override   public void messageSent(IoSession session, Object message) throws Exception {    // TODO Auto-generated method stub     }    }
主要还是提供一个思路,并没用很多业务的东西,也没有做事务和并发处理,当然这在项目中一定会处理。Mina很好用,用原生的NIO也同样能实现,原理都一样。Mina的异步通信原理网上一大堆,我就不写了。

谢谢观赏。