Mina实现自定义协议的通信
jopen
12年前
网络的传输使用需要遵循一定的规则,这些规则我们称为协议。如在互联网请求HTML页面的时候,我们要遵循HTTP协议,HTTP头的格式就是我们要遵守的规则:
Request Headers Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8 Accept-Charset: GBK,utf-8;q=0.7,*;q=0.3 Accept-Encoding: gzip,deflate,sdch Accept-Language: zh-CN,zh;q=0.8 Cache-Control: max-age=0
比如我们在做一些第三方开发的时候,经常会按照固定的格式向服务器端发送请求,服务器验证消息后,返回相应的结果。在Mina的开发中,我们也会用到这种模式。首先我们先来描述下这种方式的使用情形:
这样的方式,目前,我所在的项目组中主要用于通信和传输。为了将文件从客户端传到云端,我们先在客户端将数据根据一定的规则切片,然后通过一种特定的格式传输到服务器端。Mina,作为这中间的桥梁,秉承了框架很好的优点,快速开发。由于我们的服务器端采用的是C写的,所以我只给出客户端的编码过程,解码过程原理和开发都一样。
用Mina执行一定协议的传输,主要有以下几个步骤:
1. 设计通信协议;
2. 编写请求(返回)对象和业务对象;
3. 编写解码(编码)器;
4. 编写客户端、服务器端。
下面根据代码,一步步介绍这个过程,由于是前提的测试代码,所以没有考虑线程安全等问题,只是一个思路:
首先是通信的协议,根据mina的构建规则,将协议设计成抽象类,由请求对象和返回对象分别去继承,协议格式如下:
package com.a2.desktop.example5.mina.potocol; import org.apache.mina.core.buffer.IoBuffer; /** * 通信协议 * @author Chen.Hui * */ public abstract class AbsMessage { /** * 协议格式: * * tag | header length | Filename | File length | offset | checksum | temps | data * */ /** 请求或访问类型 请求Tag:0x00 返回Tag:0x01 共 8 bit */ public abstract byte getTag(); /** 头文件长度 共 2^16 可表示 65535 */ public abstract short getHeaderlen(); /** 根据UUID生成文件唯一标识,共 8*36=288 bit */ public abstract byte[] getFilename();//需要設計一個算法 /** 获取文件长度 2^32=4GB 共 32 bit */ public abstract int getFileLen(); /** 获取文件的偏移量offset 共 32 bit */ public abstract int getOffset(); /** 获取文件的MD5校验码 共 32 bit */ public abstract byte[] getChecksum(); /** 预留字段 长度不超过 128 bit */ public abstract byte[] getTmp(); /**data 方式传输内容 不超过1024bit*/ public abstract IoBuffer getData(); }
下面是请求对象(返回)和业务对象,这里的业务对象主要功能是将文件切片:
package com.a2.desktop.example5.mina.potocol; import java.io.IOException; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 请求对象 * * @author Chen.Hui * */ public class InfoRequest extends AbsMessage { Logger logger = LoggerFactory.getLogger(InfoRequest.class); FilePiece piece; Charset charset; public InfoRequest(FilePiece piece) { this.piece = piece; } public InfoRequest(){ //empty } @Override public byte getTag() {// 0x01 请求包 return (byte) 0x01; } @Override public short getHeaderlen() { if (getTmp() == null) { short len = (short) (1 + 2 + 36 + 4 + 4 + 4 ); return len; } else { short len = (short) (1 + 2 + 36 + 4 + 4 + 4 + (short) getTmp().length); return len; } } @Override public int getFileLen() {// 文件总长度 try { return (int) piece.getFc().size(); } catch (IOException e) { e.printStackTrace(); } return 0; } @Override public int getOffset() {// 传输 偏移量 return piece.getOffset(); } @Override public byte[] getFilename() {// 文件名称 /** check the bits of name */ byte[] name = new byte[36]; name = piece.getFilename().getBytes(); return name; } @Override public byte[] getChecksum() {// checksum byte[] checksum = new byte[4]; checksum = piece.getChecksum().getBytes(); return checksum; } @Override public byte[] getTmp() { byte[] b=new byte[5]; return b; } @Override public IoBuffer getData() { return piece.getBuf(); } }业务对象代码,RandomAccessFile可用于随机读写,用于文件的切片,这里还用了管道,主要目的是为了加开读写速度:
package com.a2.desktop.example5.mina.potocol; import java.io.File; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.UUID; import org.apache.mina.core.buffer.IoBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 分片文件操作类 * * @author Chen.Hui * */ public class FilePiece { Logger logger = LoggerFactory.getLogger(FilePiece.class); private ByteBuffer[] dsts; private IoBuffer buf; private String filename; private FileChannel fc; private RandomAccessFile raf; private int offset; private String checksum; /** 构建文件的基本信息 */ public FilePiece(String path, int offset) throws Exception { raf = new RandomAccessFile(new File(path), "rw"); fc = raf.getChannel(); this.offset = offset; dsts = new ByteBuffer[1024]; for (int i = 0; i < dsts.length; i++) { dsts[i] = ByteBuffer.allocate(1024); } fc.read(dsts, offset, 1024); buf=IoBuffer.allocate(1024); filename = UUID.randomUUID().toString(); logger.info("has built:" + filename + " filename size" + filename.length()); } /**这个方法还有点儿问题,数据取的不对*/ public IoBuffer getBuf(){ dsts[0].flip(); while(dsts[0].hasRemaining()){ buf.putChar(dsts[0].getChar()); } buf.flip(); return buf; } public String getFilename() { return filename; } public FileChannel getFc() { return fc; } public RandomAccessFile getRaf() { return raf; } public int getOffset() { return offset; } public String getChecksum() { // TODO checksum algorithems return "aaaa"; } }再接下来是编码器 ,编码器的作用就是讲数据装换成用于传输的流,在Mina中这种流就是IoBuffer:
package com.a2.desktop.example5.mina.potocol; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; /** * 编码器 * @author Chen.Hui * */ public class InfoEncoder implements MessageEncoder{ private Charset charset; public InfoEncoder(Charset charset){ this.charset=charset; } @Override public void encode(IoSession session, AbsMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buf=IoBuffer.allocate(1024).setAutoExpand(true); if(message instanceof InfoRequest){ InfoRequest req=(InfoRequest) message; buf.put(req.getTag()); buf.putShort((short)req.getHeaderlen()); buf.put(req.getFilename()); buf.putInt(req.getFileLen()); buf.putInt(req.getOffset()); buf.put(req.getChecksum()); buf.put(req.getTmp()); buf.put(req.getData()); }else if(message instanceof InfoResponse){ //TODO } buf.flip(); out.write(buf); } }
解码器与之类似,解码器在这里的作用主要用户服务器端解码:
package com.a2.desktop.example5.mina.potocol; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; /** * 解码器 * @author ChenHui * */ public class InfoDecoder implements MessageDecoder { private Charset charset; public InfoDecoder(Charset charset) { this.charset = charset; } @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { //System.out.println("package size:"+in.remaining()); // 报头长度<56 if (in.remaining() < 56) { return MessageDecoderResult.NEED_DATA; } byte tag = in.get(); short head_len=in.getShort(); if (tag == (short) 0x01) { System.out.println("请求标识符:"+tag+" head length:"+head_len); }else{ //System.out.println("未知标识符..."); return MessageDecoderResult.NOT_OK; } return MessageDecoderResult.OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { byte tag=in.get(); if(tag==0x01){ InfoReqContainer irc=new InfoReqContainer(); irc.setTag(tag); irc.setHeadlen(in.getShort()); irc.setFilename(in.getString(36, charset.newDecoder())); irc.setFilelen(in.getInt()); irc.setOffset(in.getInt()); irc.setChecksum(in.getString(4, charset.newDecoder())); irc.setTemp(in.getString(5, charset.newDecoder()));//应该用head len-53 irc.setData(in); out.write(irc); } return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // TODO Auto-generated method stub } }为了解码方便,我这里设计了一个辅助类InfoReqContainer,主要用户辅助操作:
package com.a2.desktop.example5.mina.potocol; import org.apache.mina.core.buffer.IoBuffer; /** * 请求对象解析类 * * @author Chen.Hui * */ public class InfoReqContainer { private byte tag; private short headlen; private String filename; private int filelen; private int offset; private String temp; private String checksum; private IoBuffer data; public byte getTag() { return tag; } public void setTag(byte tag) { this.tag = tag; } public short getHeadlen() { return headlen; } public void setHeadlen(short headlen) { this.headlen = headlen; } public String getFilename() { return filename; } public void setFilename(String filename) { this.filename = filename; } public int getFilelen() { return filelen; } public void setFilelen(int filelen) { this.filelen = filelen; } public int getOffset() { return offset; } public void setOffset(int offset) { this.offset = offset; } public String getTemp() { return temp; } public void setTemp(String temp) { this.temp = temp; } public String getChecksum() { return checksum; } public void setChecksum(String checksum) { this.checksum = checksum; } public IoBuffer getData() { return data; } public void setData(IoBuffer data) { this.data = data; } }
在mina中,要将解码器和编码器绑定到协议工厂类中,才能被过滤器使用:
package com.a2.desktop.example5.mina.potocol; import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageEncoder; public class InfoCodecFactory extends DemuxingProtocolCodecFactory { private MessageDecoder decoder; private MessageEncoderencoder; public InfoCodecFactory(MessageDecoder decoder, MessageEncoder encoder) { this.decoder = decoder; this.encoder = encoder; addMessageDecoder(this.decoder); addMessageEncoder(AbsMessage.class, this.encoder); } }
做完了这些,编码解码的工作都完成了,最后就是写客户端和服务器端进行测试,注意文件位置,这里没有做提示:
package com.a2.desktop.example5.mina.potocol; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class TestClient { private static String HOST = "127.0.0.1"; private static int PORT = 8082; public static void main(String[] args) { // 创建一个非阻塞的客户端程序 IoConnector connector = new NioSocketConnector(); // 设置链接超时时间 connector.setConnectTimeout(30000); // 添加过滤器 connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new InfoCodecFactory( new InfoDecoder(Charset.forName("utf-8")), new InfoEncoder(Charset.forName("utf-8"))))); // 添加业务逻辑处理器类 connector.setHandler(new ClientHandler()); IoSession session = null; try { ConnectFuture future = connector.connect(new InetSocketAddress( HOST, PORT));// 创建连接 future.awaitUninterruptibly();// 等待连接创建完成 session = future.getSession();// 获得session FilePiece piece = new FilePiece( "D:\\Develop Libs Tar\\apache-mina-2.0.7-bin.zip", 0); InfoRequest ir = new InfoRequest(piece); session.write(ir);// 发送消息 } catch (Exception e) { e.printStackTrace(); System.out.println("客户端链接异常..."); } session.getCloseFuture().awaitUninterruptibly();// 等待连接断开 connector.dispose(); } }
客户端的Handler没有做任何处理:
package com.a2.desktop.example5.mina.potocol; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; public class ClientHandler implements IoHandler { @Override public void sessionCreated(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionOpened(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionClosed(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { // TODO Auto-generated method stub } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { // TODO Auto-generated method stub } @Override public void messageReceived(IoSession session, Object message) throws Exception { //System.out.println(message.toString()); } @Override public void messageSent(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub } }服务器端:
package com.a2.desktop.example5.mina.potocol; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSessionConfig; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LogLevel; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class TestServer{ private static int PORT = 8082; public static void main(String[] args) { IoAcceptor acceptor = null; try { // 创建一个非阻塞的server端的Socket acceptor = new NioSocketAcceptor(); // 设置过滤器(添加自带的编解码器) acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new InfoCodecFactory( new InfoDecoder(Charset.forName("utf-8")), new InfoEncoder(Charset.forName("utf-8"))))); // 设置日志过滤器 LoggingFilter lf = new LoggingFilter(); lf.setMessageReceivedLogLevel(LogLevel.DEBUG); acceptor.getFilterChain().addLast("logger", lf); // 获得IoSessionConfig对象 IoSessionConfig cfg = acceptor.getSessionConfig(); // 读写通道10秒内无操作进入空闲状态 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 100); // 绑定逻辑处理器 acceptor.setHandler(new ServerHandler()); // 绑定端口 acceptor.bind(new InetSocketAddress(PORT)); System.out.println("成功开启服务器端..."); } catch (Exception e) { e.printStackTrace(); } } }
服务器端的Handler:
package com.a2.desktop.example5.mina.potocol; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; public class ServerHandler implements IoHandler { @Override public void sessionCreated(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionOpened(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionClosed(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { // TODO Auto-generated method stub } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { // TODO Auto-generated method stub } @Override public void messageReceived(IoSession session, Object message) throws Exception { if (message instanceof InfoReqContainer) { InfoReqContainer irc = (InfoReqContainer) message; System.out.println("服务器端 获取成功 Tag:" + irc.getTag() + "\r\n " + "head len:" + irc.getHeadlen() + "\r\nfilename: " + irc.getFilename() + "\r\nfile len:"+irc.getFilelen()+"\r\noffset:"+irc.getOffset()+"\r\nchecksum:"+irc.getChecksum()+"\r\ndata:"+irc.getData().toString()); session.write("success rescive"); } else { System.out.println("获取失败"); } } @Override public void messageSent(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub } }主要还是提供一个思路,并没用很多业务的东西,也没有做事务和并发处理,当然这在项目中一定会处理。Mina很好用,用原生的NIO也同样能实现,原理都一样。Mina的异步通信原理网上一大堆,我就不写了。
谢谢观赏。