Jva NIO框架 Apache Mina 2.x 简易入门解析
最近使用Mina开发一个Java的NIO服务端程序,因此也特意学习了Apache的这个Mina框架。
首先,Mina是个什么东西?看下项目主页(http://mina.apache.org/)对它的解释:
Apache的Mina(Multipurpose Infrastructure Networked Applications)是一个网络应用框架,可以帮助用户开发高性能和高扩展性的网络应用程序;它提供了一个抽象的、事件驱动的异步API,使 Java NIO在各种传输协议(如TCP/IP,UDP/IP协议等)下快速高效开发。
Apache Mina也称为:
- NIO框架
- 客户端/服务端框架(典型的C/S架构)
- 网络套接字(networking socket)类库
- 事件驱动的异步API(注意:在JDK7中也新增了异步API)
总之:我们简单理解它是一个封装底层IO操作,提供高级操作API的通讯框架!
在Mina的官网、以及网上都有比较丰富的文档了,这里我就稍微简单说一下Mina的结构和示例代码。
因为Mina2.X改进了Mina的代码结构和包结构,降低了使用的复杂性和增强了健壮性,所以使得API发生了比较大的改变,有许多地方已经和Mina1.x不兼容了。
这里使用的是Mina2.0.4
1.Mina的结构
Mina的通信流程大致如上图所示,各个组件功能有:
(1.) IoService:这个接口在一个线程上负责套接字的建立,拥有自己的Selector,监
听是否有连接被建立。
(Mina底层使用JAVA NIO, 因此它是典型的使用Reactor模式架构的,采用事件驱动编程 , Mina运行用户自定义线程模型,可以是单线程、多线程、线程池等 ,
跟JAVA Socket不一样, Mina是非阻塞的Socket,它内部已经保证了对各个连接(session)的业务和数据的隔离,采用轮询机制为各个session分配CPU资源,
所以,你就不需要再去考虑不同Socket连接需要用不同的线程去操纵的问题了。)
(2.) IoProcessor:这个接口在另一个线程上负责检查是否有数据在通道上读写,也就是
说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,
通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService
与 IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上
的过滤器,并在过滤器链之后调用IoHandler。
(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、
数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与 decode
是最为重要的、也是你在使用Mina 时最主要关注的地方。
(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。
2. Mina编程的大致过程.
2.1 总体流程
建立服务器端的资源: 包括 Acceptor的建立,之后为Acceptor配置相应的Filter(可以是Mina自带的Filter或者自定义的Filter),
之后再配置相应基于事件驱动的处理业务逻辑的IoHandler.
建立客户端的资源: Mina采用了统一的编程模型,所以建立客户端的过程和建立服务器端的过程大致上是相似的,不过这里建立的是Connector.
2.2 示例程序。(使用jar包为 mina-core-2.0.4.jar)
下面通过一个简单的示例程序来进一步理解Mina的运行机制。
该程序实现简单的即时通讯功能。 即,多个客户端可以同时脸上服务器,并进行类似于聊天室一样的通信。
2.2.1 建立自定义的TextLineCodecFacotry
为了了解Mina的代码功能以及运行机制,我们模拟实现了类似Mina自带TextLineCodecFactory。
该CodecFactory功能是: 配合ProtocolCodecFilter 进行对底层数据(binary二进制数据流)和高层数据(特定类型的数据对象信息,例如String)之间的转换。
这里实现了一个断行读取功能,即遇到'\n'的时候,就认为是一个String Line , 将这段数据流封装成String,之后再交给下一个Filter或者Handler处理。
CodecFactory是一个工厂方法,底层通过一个Decoder和Encoder来提供对数据进行解、编码的操作,载体是IoBuffer。
(解码:将二进制数据转换成高层数据(对象) 编码:将高层数据(对象)转换成二进制数据流) )
1)建立Decoder (MyTextLineDecoder)
实现了ProtocolDecoder接口
package com.mai.mina.diyCodecFilter; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; public class MyTextLineDecoder implements ProtocolDecoder{ Charset charset = Charset.forName("UTF-8"); IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true); @Override public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput output) throws Exception { // TODO Auto-generated method stub while(in.hasRemaining()){ byte b = in.get(); if(b == '\n'){ buf.flip(); byte[] bytes = new byte[buf.limit()]; buf.get(bytes); String message = new String(bytes,charset); buf = IoBuffer.allocate(100).setAutoExpand(true); output.write(message); }else{ buf.put(b); } } } @Override public void dispose(IoSession arg0) throws Exception { // TODO Auto-generated method stub } @Override public void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1) throws Exception { // TODO Auto-generated method stub } }
2)建立Encoder (MyTextLineEncoder)
实现了ProtocolEncoder接口
package com.mai.mina.diyCodecFilter; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.textline.LineDelimiter; public class MyTextLineEncoder implements ProtocolEncoder{ Charset charset = Charset.forName("UTF-8"); @Override public void dispose(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput output) throws Exception { // TODO Auto-generated method stub IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true); buf.putString(message.toString(), charset.newEncoder()); buf.putString(LineDelimiter.DEFAULT.getValue(), charset.newEncoder()); buf.flip(); output.write(buf); } }
3)建立MyTextLineCodecFactory
实现了ProtocolCodecFactory接口
package com.mai.mina.diyCodecFilter; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; public class MyTextLineCodecFactory implements ProtocolCodecFactory{ @Override public ProtocolDecoder getDecoder(IoSession arg0) throws Exception { // TODO Auto-generated method stub return new MyTextLineDecoder(); } @Override public ProtocolEncoder getEncoder(IoSession arg0) throws Exception { // TODO Auto-generated method stub return new MyTextLineEncoder(); } }2.2.2 建立服务器端资源(包括Acceptor的配置、Handler建立)
1). 建立自定义IoHandler(MyServerHandleDemo1)
实现了IoHandler接口。
package com.mai.mina.diyChat; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; import java.util.logging.Logger; import org.apache.mina.core.future.CloseFuture; import org.apache.mina.core.future.IoFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; public class MyServerHandleDemo1 implements IoHandler{ private Logger logger = Logger.getLogger(this.getClass().getName()); @Override public void exceptionCaught(IoSession session, Throwable arg1) throws Exception { // TODO Auto-generated method stub logger.warning("服务器启动发生异常,have a exception : " + arg1.getMessage()); } @Override public void messageReceived(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub String messageStr = message.toString(); DateFormat format = new SimpleDateFormat("yyyy-MM-dd H:m:s"); String dateStr = format.format(new Date()); logger.info(messageStr + "\t" + dateStr); Collection2).建立Acceptor ,同时也充当Server端的启动类 (SimpleMinaServer)sessions = session.getService().getManagedSessions().values(); for(IoSession tempSession : sessions){ tempSession.write(messageStr + "\t" + dateStr); } } @Override public void messageSent(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub logger.info("服务器成功发送信息: " + message.toString()); } @Override public void sessionClosed(IoSession session) throws Exception { // TODO Auto-generated method stub logger.info("there is a session closed"); CloseFuture future = session.close(true); future.addListener(new IoFutureListener(){ public void operationComplete(IoFuture future){ if(future instanceof CloseFuture){ ((CloseFuture)future).setClosed(); logger.info("have do the future set to closed"); } } }); } @Override public void sessionCreated(IoSession session) throws Exception { // TODO Auto-generated method stub logger.info("there is a session created"); session.write("welcome to the chat room"); } @Override public void sessionIdle(IoSession session, IdleStatus arg1) throws Exception { // TODO Auto-generated method stub logger.info(session.getId() + "(SesssionID) is idle in the satate-->" + arg1); } @Override public void sessionOpened(IoSession arg0) throws Exception { // TODO Auto-generated method stub } }
package com.mai.mina.diyChat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LogLevel; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.SocketAcceptor; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class SimpleMinaServer { SocketAcceptor acceptor = null; SimpleMinaServer(){ acceptor = new NioSocketAcceptor(); } public boolean bind(){ acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter( new MyTextLineCodecFactory()); //配置CodecFactory LoggingFilter log = new LoggingFilter(); log.setMessageReceivedLogLevel(LogLevel.INFO); acceptor.getFilterChain().addLast("logger", log); acceptor.setHandler(new MyServerHandleDemo1()); //配置handler acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30); try { acceptor.bind(new InetSocketAddress(8888)); return true; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); return false; } } public static void main(String args[]){ SimpleMinaServer server = new SimpleMinaServer(); if(!server.bind()){ System.out.println("服务器启动失败"); }else{ System.out.println("服务器启动成功"); } } }2.2.3 建立Client端资源:
1)自定义IoHandler(MyClientHandleDemo1)
实现IoHandler接口
package com.mai.mina.diyChat; import java.util.logging.Logger; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; public class MyClientHandleDemo1 extends IoHandlerAdapter{ private ChatPanel messagePanel = null; private Logger logger = Logger.getLogger(this.getClass().getName()); MyClientHandleDemo1(){ } MyClientHandleDemo1(ChatPanel messagePanel){ this.messagePanel = messagePanel; } public void messageReceived(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub String messageStr = message.toString(); logger.info("receive a message is : " + messageStr); if(messagePanel != null) messagePanel.showMsg(messageStr); } public void messageSent(IoSession session , Object message) throws Exception{ logger.info("客户端发了一个信息:" + message.toString()); } }2) 建立Connector (SimpleMinaClient)
package com.mai.mina.diyChat; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.CloseFuture; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LogLevel; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.SocketConnector; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class SimpleMinaClient { public SocketConnector connector = null; public ConnectFuture future; public IoSession session = null; private ChatPanel messagePanel = null; SimpleMinaClient(){ } SimpleMinaClient(ChatPanel messagePanel){ this.messagePanel = messagePanel; } boolean connect(){ try{ connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(3000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter( new MyTextLineCodecFactory()); LoggingFilter log = new LoggingFilter(); log.setMessageReceivedLogLevel(LogLevel.INFO); connector.getFilterChain().addLast("logger", log); connector.setHandler(new MyClientHandleDemo1(messagePanel)); future = connector.connect(new InetSocketAddress("127.0.0.1" , 8888)); future.awaitUninterruptibly(); session = future.getSession(); return true; }catch(Exception e){ e.printStackTrace(); return false; } } public void setAttribute(Object key , Object value){ session.setAttribute(key, value); } void sentMsg(String message){ session.write(message); } boolean close(){ CloseFuture future = session.getCloseFuture(); future.awaitUninterruptibly(1000); connector.dispose(); return true; } public SocketConnector getConnector() { return connector; } public IoSession getSession() { return session; } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub SimpleMinaClient client = new SimpleMinaClient(); if(client.connect()){ client.sentMsg("hello , sever !"); client.close(); } } }
到这里,基本的Mina通信基础就建立好了。
接下来实现一个客户端的GUI界面,方便实际功能的建立和信息交互的演示。
2.2.4 Client Gui界面的建立。(ChatPanel -通过使用SimpleMinaClient来提供实际通信功能)
package com.mai.mina.diyChat; import java.awt.BorderLayout; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JLabel; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTextArea; import javax.swing.JTextField; import org.apache.commons.lang.math.RandomUtils; public class ChatPanel extends javax.swing.JPanel { private JPanel northPanel; private JLabel headLabel; private JScrollPane jScrollPane1; private JScrollPane jScrollPane2; private JButton exitB; private JButton clearMsgB; private JButton sentB; private JButton connectB; private JTextArea messageText; private JTextField nameText; private JLabel nameLabel; private JTextArea messageArea; private JPanel southPanel; private SimpleMinaClient client = null; private boolean connected = false; private String username = null; { //Set Look & Feel try { javax.swing.UIManager.setLookAndFeel("com.sun.java.swing.plaf.windows.WindowsLookAndFeel"); } catch(Exception e) { e.printStackTrace(); } } public void connect(){ if(client.connect()){ username = nameText.getText().trim(); if(username == null || "".equals(username)){ username = "游客" + RandomUtils.nextInt(1000); nameText.setText(username); } connected = true; dealUIWithFlag(); }else{ connected = false; dealUIWithFlag(); showMsg("连接服务器失败。。。"); } } public void showMsg(String msg){ messageArea.append(msg); messageArea.append("\n"); messageArea.selectAll(); messageArea.lostFocus(null, this); } public void sentMsg(){ String message = username + ":" + messageText.getText(); client.sentMsg(message); messageText.setText(""); messageText.requestFocus(); } public void dealUIWithFlag(){ if(connected){ nameText.setEnabled(false); connectB.setEnabled(false); sentB.setEnabled(true); clearMsgB.setEnabled(true); exitB.setEnabled(true); }else{ nameText.setEnabled(true); connectB.setEnabled(true); sentB.setEnabled(false); clearMsgB.setEnabled(false); exitB.setEnabled(false); } } public void closeTheClient(){ if(client.close()){ showMsg("连接已断开..."); connected = false; dealUIWithFlag(); }else{ showMsg("无法断开连接..."); } } public static void main(String[] args) { JFrame frame = new JFrame(); frame.getContentPane().add(new ChatPanel()); frame.addWindowListener(new WindowAdapter(){ public void windowClosing(WindowEvent e){ System.exit(0); } }); frame.pack(); frame.setLocationByPlatform(true); frame.setVisible(true); } public ChatPanel() { super(); client = new SimpleMinaClient(this); initGUI(); dealUIWithFlag(); } private void initGUI() { try { this.setPreferredSize(new java.awt.Dimension(400, 339)); this.setLayout(null); { northPanel = new JPanel(); BorderLayout northPanelLayout = new BorderLayout(); northPanel.setLayout(northPanelLayout); this.add(northPanel); northPanel.setBounds(0, 0, 400, 188); { headLabel = new JLabel(); northPanel.add(headLabel, BorderLayout.NORTH); headLabel.setText("\u6b22\u8fce\u4f7f\u7528 (\u6d4b\u8bd5Ip:port --> 127.0.0.1:8888)"); headLabel.setPreferredSize(new java.awt.Dimension(397, 19)); } { jScrollPane1 = new JScrollPane(); northPanel.add(jScrollPane1, BorderLayout.CENTER); jScrollPane1.setPreferredSize(new java.awt.Dimension(400, 169)); { messageArea = new JTextArea(); jScrollPane1.setViewportView(messageArea); messageArea.setPreferredSize(new java.awt.Dimension(398, 145)); messageArea.setEditable(false); messageArea.setLineWrap(true); messageArea.setWrapStyleWord(true); } } } { southPanel = new JPanel(); this.add(southPanel); southPanel.setBounds(0, 194, 400, 145); southPanel.setLayout(null); { nameLabel = new JLabel(); southPanel.add(nameLabel); nameLabel.setText("\u6635\u79f0:"); nameLabel.setBounds(10, 12, 35, 15); } { nameText = new JTextField(); southPanel.add(nameText); nameText.setText("\u6e38\u5ba2"); nameText.setBounds(45, 9, 96, 21); } { jScrollPane2 = new JScrollPane(); southPanel.add(jScrollPane2); jScrollPane2.setBounds(15, 37, 364, 69); { messageText = new JTextArea(); jScrollPane2.setViewportView(messageText); messageText.setBounds(101, 72, 362, 75); messageText.setPreferredSize(new java.awt.Dimension(362, 54)); messageText.setLineWrap(true); messageText.setWrapStyleWord(true); } } { connectB = new JButton(); southPanel.add(connectB); connectB.setText("\u8fde\u63a5\u670d\u52a1\u5668"); connectB.setBounds(179, 8, 93, 23); connectB.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent evt) { System.out.println("connectB.actionPerformed, event="+evt); //TODO add your code for connectB.actionPerformed connect(); } }); } { sentB = new JButton(); southPanel.add(sentB); sentB.setText("\u53d1\u9001"); sentB.setBounds(261, 116, 57, 23); sentB.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent evt) { System.out.println("sentB.actionPerformed, event="+evt); //TODO add your code for sentB.actionPerformed sentMsg(); } }); } { clearMsgB = new JButton(); southPanel.add(clearMsgB); clearMsgB.setText("\u6e05\u7a7a"); clearMsgB.setBounds(324, 116, 57, 23); clearMsgB.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent evt) { System.out.println("clearMsgB.actionPerformed, event="+evt); //TODO add your code for clearMsgB.actionPerformed messageText.setText(""); } }); } { exitB = new JButton(); southPanel.add(exitB); exitB.setText("\u6ce8\u9500"); exitB.setBounds(282, 8, 57, 23); exitB.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent evt) { System.out.println("exitB.actionPerformed, event="+evt); //TODO add your code for exitB.actionPerformed closeTheClient(); } }); } } } catch (Exception e) { e.printStackTrace(); } } }3. 运行结果
首先启动服务器端,即运行SimpleMinaServer类 , 启动成功时会在控制台中打印出“服务器启动成功"
接下来运行客户端ChatPanel。
note: 上面只是一个简单的信息交互,其实使用Mina比较常用的还是在自定义协议处理这块。
来自:http://www.cnblogs.com/mailingfeng/archive/2012/02/08/2342522.html