Java NIO之【Scalable IO in Java】
381355763
8年前
<p>看完了并发网的NIO教程,是否有种意犹未尽的感觉。正常情况下,答案应该是肯定的。那我们下面来看下Doug Lea大神写的 Scalable IO in Java ,直接可以下载英文版pdf。这边就当边学习边翻译了。</p> <h3>网络服务</h3> <p>大部分网路服务有着相同的体系:</p> <ul> <li>读取请求(Read request)</li> <li>对请求进行解码(Decode request)</li> <li>处理业务逻辑(Process service)</li> <li>对返回值进行编码(Encode reply)</li> <li>发送返回值(Send reply)</li> </ul> <p>下面我们来看下传统的设计模型:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/5e5d79c891d27dd941609e4cde490568.png"></p> <p>其中,每一个handler有可能都要新起一个线程去执行。用伪代码模拟如下:</p> <pre> <code class="language-java">public classServce{ publicstaticvoidmain(String[] args)throwsIOException{ ServerSocket ss = new ServerSocket(1234); while (!Thread.interrupted()) { new Thread(new Handler(ss.accept())).start(); } } static classHandlerimplementsRunnable{ final Socket socket; Handler(Socket socket) { this.socket = socket; } @Override publicvoidrun(){ try { byte[] input = new byte[1024]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException e) { e.printStackTrace(); } } private byte[] process(byte[] cmd) { return null; } } } </code></pre> <p>从伪代码可以看出传统I/O模型的雏形,需要为每一个接收到的socket连接新建一个线程去执行具体的业务逻辑。</p> <h3>可扩展性的目标</h3> <p>首先,肯定是不满意上面传统的I/O设计模型,才有接下来的讨论。无休止地新建线程去执行具体业务逻辑,最终无疑会拖垮整个系统。当然,也很容易想到,可以用线程池,但是这样虽然可以限制线程数量,但是并发数也因此被限制了,所以并不是解决之道。那我们就来看下可扩展性I/O的目标是什么:</p> <ul> <li>高负载情况下的优雅降级</li> <li>硬件的升级能持续地给系统带来性能提升</li> <li>当然也包含可用性和性能的目标:低延迟、高负载等</li> </ul> <h3>分治法(Divide and Conquer)</h3> <p>分治法一般是解决可扩展性的最好的途径。将处理流程分成一些小的任务,每一个任务都包含一个非阻塞操作。当任务准备好的时候去执行它。这里,一个I/O事件通常被作为触发器。比如下面:</p> <p><img src="https://simg.open-open.com/show/e6109984f26dc4f84241ae8d9dbf8341.png"></p> <p>说实话,上面这一话配上这张图,不是很能理解。被分成的小任务是整个handler,还是比如说read这样一个操作。感觉是把handler拆成一个个小任务,再往下学吧,应该会越来越清晰。</p> <p>java.nio提供如下基本的机制:</p> <ul> <li>非阻塞的读和写</li> <li>与感兴趣的I/O事件相关联的任务分配机制</li> </ul> <h3>事件驱动设计</h3> <p>一系列事件驱动设计使得无限可能。这种方式通常比其他方案更有效,原因如下:</p> <ul> <li>占用资源少:不需要为每个客户端开启一个新线程</li> <li>开销少:减少上下文切换的开销,减少锁的使用</li> </ul> <p>但是,通常也更难编码,原因如下:</p> <ul> <li>必须拆分成许多小的非阻塞单元,但是无法消除所有的阻塞动作,比如说GC、页错误等</li> <li>必须持续追踪服务的逻辑状态</li> </ul> <h3>Reactor模式</h3> <p>Reactor模式有如下几个特征:</p> <ul> <li>Reactor通过调度相应的处理程序来相应I/O事件</li> <li>处理程序执行非阻塞操作</li> <li>通过绑定处理程序来管理事件。</li> </ul> <p>我们先来看下单个线程版本的模型图:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/7222ffd6927a5f78febaee6a140bd0e4.png"></p> <p>java.nio中的Channel、Buffer、Selector、SelectionKey类可以支持该模型。上图如果第一眼不能很好地理解的话,先来看下代码,涉及到两个类。</p> <pre> <code class="language-java">public classReactorimplementsRunnable{ final Selector selector; final ServerSocketChannel serverSocketChannel; Reactor(int port) throws IOException { // 初始化ServerSocketChannel,以非阻塞模式运行 serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); // 初始化Selector selector = Selector.open(); // 将ServerSocketChannel注册到Selector上 SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 在SelectionKey上绑定一个附属对象Acceptor selectionKey.attach(new Acceptor()); } @Override publicvoidrun(){ try { while (!Thread.interrupted()) { // 阻塞直至事件就绪 selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { // 分发 dispatch((SelectionKey)(it.next())); } // 需要自己清除selectedKeys selected.clear(); } }catch (IOException ex) { } } voiddispatch(SelectionKey k){ /** * 获取SelectionKey中的attachment,并执行该attachment的run()方法 * 拿第一个到达的socket连接来看,该attachment为一个Acceptor实例 */ Runnable r = (Runnable)(k.attachment()); if (r != null) { r.run(); } } classAcceptorimplementsRunnable{ publicvoidrun(){ try { // 获取新连接的SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { // 具体的处理逻辑 new Handler(selector, socketChannel); } } catch (IOException ex) { } } } } </code></pre> <pre> <code class="language-java">public classHandlerimplementsRunnable{ final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(1024); ByteBuffer output = ByteBuffer.allocate(1024); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; socket.configureBlocking(false); // 继续在Selector上注册读事件,此时attachment为当前Handler实例 sk = socket.register(sel, SelectionKey.OP_READ, this); // 使选择器上的第一个还没有返回的选择操作立即返回 sel.wakeup(); } booleaninputIsComplete(){ return true; } booleanoutputIsComplete(){ return true; } voidprocess(){ System.out.println("Handle processing..."); } @Override publicvoidrun(){ try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { /* ... */ } } voidread()throwsIOException{ System.out.println("Handle reading..."); socket.read(input); if (inputIsComplete()) { process(); state = SENDING; // 在SelectionKey上注册写事件 sk.interestOps(SelectionKey.OP_WRITE); } } voidsend()throwsIOException{ System.out.println("Handle writing..."); socket.write(output); if (outputIsComplete()) { //write完就结束了, 关闭select key sk.cancel(); } } } </code></pre> <p>结合模型图和代码,直观的感受是单个线程可以同时处理多个客户端请求了。下面列举下Reactor模式的一些概念:</p> <ul> <li>Reactor:负责响应I/O事件,当检测到一个新的事件,将其发送给相应的Handler去处理</li> <li>Handler:负责处理非阻塞的行为,同时将handler与事件绑定</li> </ul> <p>Reactor为单个线程,需要处理accept连接,同时发送请求到处理器中。由于只有单个线程,所以handler中的业务需要能够快速处理完。当然,还能再改进,可以将具体的业务逻辑放到单独的线程池中去跑,这儿就不实习了。同时,NIO暂时也就看到这里,主要是了解下相关知识,为下面学习Netty做个准备。</p> <p> </p> <p>来自:http://bboyjing.github.io/2017/04/05/Java-NIO之【Scalable-IO-in-Java】/</p> <p> </p>