Apache Mina server简单编解码实现
openkk
13年前
协议编解码器是在使用Mina 的时候你最需要关注的对象,因为在网络传输的数据都是二进制数据(byte),而你在程序中面向的是JAVA 对象,这就需要你实现在发送数据
时将JAVA 对象编码二进制数据,而接收数据时将二进制数据解码为JAVA 对象(这个可不是JAVA 对象的序列化、反序列化那么简单的事情)。废话少说直接上代码:
Server:
package com.cemso.mina.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import com.cemso.mina.coder.CmccSipcCodecFactory; /** * @author gl65293 */ public class MinaTimeServer { private static final int PORT = 9123; public static void main(String[] args) throws IOException{ IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast("logger", new LoggingFilter()); // acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8")))); acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset.forName("UTF-8")))); acceptor.setHandler(new TimeServerHandler()); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.bind(new InetSocketAddress(PORT)); } }
package com.cemso.mina.server; /** * @author gl65293 * */ import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import com.cemso.mina.dto.SmsObject; public class TimeServerHandler extends IoHandlerAdapter { public TimeServerHandler() { } public void exceptionCaught(IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); } public void messageReceived(IoSession session, Object message) throws Exception { SmsObject sms = (SmsObject)message; System.out.println(sms.getMessage()); System.out.println(sms.getSender()); System.out.println(sms.getReceiver()); } public void sessionIdle(IoSession session, IdleStatus status) throws Exception { System.out.println((new StringBuilder()).append("IDLE ").append(session.getIdleCount(status)).toString()); } }dto:
package com.cemso.mina.dto; /** * @author gl65293 * */ public class SmsObject { private String sender; private String receiver; private String message; /** * @return the sender */ public String getSender() { return sender; } /** * @param sender the sender to set */ public void setSender(String sender) { this.sender = sender; } /** * @return the receiver */ public String getReceiver() { return receiver; } /** * @param receiver the receiver to set */ public void setReceiver(String receiver) { this.receiver = receiver; } /** * @return the message */ public String getMessage() { return message; } /** * @param message the message to set */ public void setMessage(String message) { this.message = message; } }encoder and decoder:
package com.cemso.mina.coder; import java.nio.charset.Charset; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; /** * @author gl65293 * */ public class CmccSipcCodecFactory implements ProtocolCodecFactory { private final CmccSipcEncoder encoder; private final CmccSipcDecoder decoder; public CmccSipcCodecFactory(){ this(Charset.defaultCharset()); } /** * @param defaultCharset */ public CmccSipcCodecFactory(Charset charSet) { this.encoder = new CmccSipcEncoder(charSet); this.decoder = new CmccSipcDecoder(charSet); } /* (non-Javadoc) * @see org.apache.mina.filter.codec.ProtocolCodecFactory#getDecoder(org.apache.mina.core.session.IoSession) */ @Override public ProtocolDecoder getDecoder(IoSession iosession) throws Exception { // TODO Auto-generated method stub return decoder; } /* (non-Javadoc) * @see org.apache.mina.filter.codec.ProtocolCodecFactory#getEncoder(org.apache.mina.core.session.IoSession) */ @Override public ProtocolEncoder getEncoder(IoSession iosession) throws Exception { // TODO Auto-generated method stub return encoder; } }
package com.cemso.mina.coder; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import com.cemso.mina.dto.SmsObject; /** * @author gl65293 * */ public class CmccSipcDecoder extends CumulativeProtocolDecoder { private final Charset charset; /** * @param charset */ public CmccSipcDecoder(Charset charset) { this.charset = charset; } /* (non-Javadoc) * @see org.apache.mina.filter.codec.CumulativeProtocolDecoder#doDecode(org.apache.mina.core.session.IoSession, org.apache.mina.core.buffer.IoBuffer, org.apache.mina.filter.codec.ProtocolDecoderOutput) */ @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { // TODO Auto-generated method stub IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); CharsetDecoder decoder = charset.newDecoder(); int matchCount = 0; String statusLine = ""; String sender = ""; String receiver = ""; String length = ""; String sms=""; int i = 1; while(in.hasRemaining()){ byte b = in.get(); buffer.put(b); if(b == 10 && i < 5){ matchCount ++; if(i == 1){ buffer.flip(); statusLine = buffer.getString(matchCount,decoder); statusLine = statusLine.substring(0, statusLine.length()-1); matchCount = 0; buffer.clear(); } if(i == 2){ buffer.flip(); sender = buffer.getString(matchCount,decoder); sender = sender.substring(0, sender.length()-1); matchCount = 0; buffer.clear(); } if(i == 3){ buffer.flip(); receiver = buffer.getString(matchCount,decoder); receiver = receiver.substring(0, receiver.length()-1); matchCount = 0; buffer.clear(); } if(i == 4){ buffer.flip(); length = buffer.getString(matchCount,decoder); length = length.substring(0, length.length()-1); matchCount = 0; buffer.clear(); } i++; }else if(i == 5){ matchCount ++; if(matchCount == Long.parseLong(length.split(": ")[1])){ buffer.flip(); sms = buffer.getString(matchCount,decoder); i++; break; } }else{ matchCount ++; } } SmsObject smsObject = new SmsObject(); smsObject.setSender(sender.split(": ")[1]); smsObject.setReceiver(receiver.split(": ")[1]); smsObject.setMessage(sms); out.write(smsObject); return false; } }
package com.cemso.mina.coder; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import com.cemso.mina.dto.SmsObject; /** * @author gl65293 * */ public class CmccSipcEncoder extends ProtocolEncoderAdapter { private final Charset charset; public CmccSipcEncoder(Charset charset){ this.charset = charset; } /* (non-Javadoc) * @see org.apache.mina.filter.codec.ProtocolEncoder#encode(org.apache.mina.core.session.IoSession, java.lang.Object, org.apache.mina.filter.codec.ProtocolEncoderOutput) */ @Override public void encode(IoSession iosession, Object obj, ProtocolEncoderOutput out) throws Exception { // TODO Auto-generated method stub SmsObject sms = (SmsObject)obj; CharsetEncoder charst = charset.newEncoder(); IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0"; String sender = sms.getSender(); String recevier = sms.getReceiver(); String smsContent = sms.getMessage(); buffer.putString(statusLine+'\n', charst); buffer.putString("S: "+sender+'\n', charst); buffer.putString("R: "+recevier+'\n', charst); buffer.putString("L: "+smsContent.getBytes(charset).length+"\n", charst); buffer.putString(smsContent, charst); buffer.flip(); out.write(buffer); } }client:
package com.cemso.mina.client; /** * @author gl65293 * */ import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoConnector; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.cemso.mina.coder.CmccSipcCodecFactory; // Referenced classes of package org.apache.mina.example.sumup: // ClientSessionHandler public class Client { public Client() { } public static void main(String args[]) throws Throwable { /* if (args.length == 0) { System.out.println("Please specify the list of any integers"); return; } // prepare values to sum up int[] values = new int[args.length]; for (int i = 0; i < args.length; i++) { values[i] = Integer.parseInt(args[i]); } NioSocketConnector connector = new NioSocketConnector(); // Configure the service. connector.setConnectTimeoutMillis(CONNECT_TIMEOUT); if (USE_CUSTOM_CODEC) { connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SumUpProtocolCodecFactory(false))); } else { connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); } connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.setHandler(new ClientSessionHandler(values)); IoSession session; for (;;) { try { ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT)); future.awaitUninterruptibly(); session = future.getSession(); break; } catch (RuntimeIoException e) { System.err.println("Failed to connect."); e.printStackTrace(); Thread.sleep(5000); } } // wait until the summation is done session.getCloseFuture().awaitUninterruptibly(); connector.dispose();*/ IoConnector connector = new NioSocketConnector(); connector.setHandler(new ClientSessionHandler("你好!\r\n哈哈!")); connector.setConnectTimeoutMillis(30000); // connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8")))); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset.forName("UTF-8")))); connector.connect(new InetSocketAddress("127.0.0.1",9123)); } }
package com.cemso.mina.client; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import com.cemso.mina.dto.SmsObject; /** * @author gl65293 * */ public class ClientSessionHandler extends IoHandlerAdapter{ private final String values; public ClientSessionHandler(String values){ this.values = values; } public void sessionOpened(IoSession session){ //session.write(values); SmsObject sms = new SmsObject(); sms.setSender("18817261072"); sms.setReceiver("15951892458"); sms.setMessage("你好! Hello world!"); session.write(sms); } }