把Socket撸成RxSocket
dmsa4941
8年前
<h3>0 .概述</h3> <p>最近,公司的项目需要重构代码,因为个人最近接触了Rx这个高大上的东东,</p> <p>而,项目原来是满满的回调地狱...项目是基于Socket的长连接,所以呢,</p> <p>从最底层开始,把普通的Socket撸成Rx!</p> <h3>1 .设计模式</h3> <p>首先,这个RxSocket是唯一的,也就是,全局唯一咯,</p> <p>嗯,设计成单例。</p> <pre> <code class="language-java">public static RxSocket getInstance() { RxSocket rxSocket = defaultInstance; if (defaultInstance == null) { synchronized (RxSocket.class) { rxSocket = defaultInstance; if (defaultInstance == null) { rxSocket = new RxSocket(); defaultInstance = rxSocket; } } } return rxSocket; }</code></pre> <p>双重加锁型的单例。</p> <h3>2 .对外的接口/方法</h3> <p>Socket,第一个想到就是连接,读写,而,我们外界想知道的,就只是是否</p> <p>写,连接是否成功,和读到啥数据。所以定义:</p> <pre> <code class="language-java">public Observable<Boolean> connectRx(String ip, int port); public Observable<Boolean> disConnect(); public Observable<byte[]> read(); public Observable<Boolean> write(ByteBuffer buffer);</code></pre> <p>还有一点,应该要有一个方法,让外界知道,这个Socket的状态,也就是监听方法:</p> <p>public Observable<SocketStatus> socketStatusListener ();</p> <h3>3 .具体代码实现</h3> <pre> <code class="language-java">package chestnut.RxSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.TimeUnit; import chestnut.utils.LogUtils; import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; import rx.subjects.SerializedSubject; import rx.subjects.Subject; /** * Created by Chestnut on 2016/12/18. */ public class RxSocket { //本类的常量 private static final String TAG = "RxSocket"; private static final boolean OpenLog = true; private static final long WRITE_TIME_OUT = 3000; private static final long CONNECT_TIME_OUT = 3000; //单例 private Subject<Object,byte[]> readSubject; private Subject<Object,SocketStatus> connectStatus; private static volatile RxSocket defaultInstance; private RxSocket() { readSubject = new SerializedSubject(PublishSubject.create()); connectStatus = new SerializedSubject(PublishSubject.create()); } public static RxSocket getInstance() { RxSocket rxSocket = defaultInstance; if (defaultInstance == null) { synchronized (RxSocket.class) { rxSocket = defaultInstance; if (defaultInstance == null) { rxSocket = new RxSocket(); defaultInstance = rxSocket; } } } return rxSocket; } //变量 private SocketStatus socketStatus = SocketStatus.DIS_CONNECT; private Selector selector = null; private SocketChannel socketChannel = null; private SelectionKey selectionKey = null; private ReadThread readThread = null; private boolean isReadThreadAlive = true; private SocketReconnectCallback socketReconnectCallback = null; //方法 /** * 监听Socket的状态 * @return */ public Observable<SocketStatus> socketStatusListener () { return connectStatus; } /** * 建立Socket连接,只是尝试建立一次 * @param ip IP or 域名 * @param port 端口 * @return Rx true or false */ public Observable<Boolean> connectRx(String ip, int port) { return Observable .create(new Observable.OnSubscribe<Boolean>() { @Override public void call(Subscriber<? super Boolean> subscriber) { //正在连接 if (socketStatus == SocketStatus.CONNECTING) { subscriber.onNext(false); subscriber.onCompleted(); return; } //未连接 | 已经连接,关闭Socket socketStatus = SocketStatus.DIS_CONNECT; isReadThreadAlive = false; readThread = null; if (selector!=null) try { selector.close(); } catch (Exception e) { LogUtils.i(OpenLog,TAG,"selector.close"); } if (selectionKey!=null) try { selectionKey.cancel(); } catch (Exception e) { LogUtils.i(OpenLog,TAG,"selectionKey.cancel"); } if (socketChannel!=null) try { socketChannel.close(); } catch (Exception e) { LogUtils.i(OpenLog,TAG,"socketChannel.close"); } //重启Socket isReadThreadAlive = true; readThread = new ReadThread(ip,port); readThread.start(); socketReconnectCallback = new SocketReconnectCallback() { @Override public void onSuccess() { subscriber.onNext(true); subscriber.onCompleted(); } @Override public void onFail(String msg) { LogUtils.i(OpenLog,TAG,"connectRx:"+msg); subscriber.onNext(false); subscriber.onCompleted(); } }; } }) .subscribeOn(Schedulers.newThread()) .timeout(CONNECT_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false)); } /** * 断开当前的Socket * @return Rx true or false */ public Observable<Boolean> disConnect() { return Observable.create(new Observable.OnSubscribe<Boolean>() { @Override public void call(Subscriber<? super Boolean> subscriber) { try { if (socketStatus == SocketStatus.DIS_CONNECT) { subscriber.onNext(true); subscriber.onCompleted(); } else { socketStatus = SocketStatus.DIS_CONNECT; isReadThreadAlive = false; readThread = null; if (selector!=null) try { selector.close(); } catch (Exception e) { LogUtils.i(OpenLog,TAG,"selector.close"); } if (selectionKey!=null) try { selectionKey.cancel(); } catch (Exception e) { LogUtils.i(OpenLog,TAG,"selectionKey.cancel"); } if (socketChannel!=null) try { socketChannel.close(); } catch (Exception e) { LogUtils.i(OpenLog,TAG,"socketChannel.close"); } subscriber.onNext(true); subscriber.onCompleted(); } } catch (Exception e) { subscriber.onNext(false); subscriber.onCompleted(); } } }); } /** * 读取Socket的消息 * @return Rx error 或者 有数据 */ public Observable<byte[]> read() { if (socketStatus != SocketStatus.CONNECTED) return Observable.create(new Observable.OnSubscribe<byte[]>() { @Override public void call(Subscriber<? super byte[]> subscriber) { subscriber.onError(new Throwable("Socket Dis Connect")); } }); else return readSubject; } /** * 向Socket写消息 * @param buffer 数据包 * @return Rx true or false */ public Observable<Boolean> write(ByteBuffer buffer) { return Observable .create(new Observable.OnSubscribe<Boolean>() { @Override public void call(Subscriber<? super Boolean> subscriber) { if (socketStatus != SocketStatus.CONNECTED) { subscriber.onNext(false); subscriber.onCompleted(); } else { if (socketChannel!=null && socketChannel.isConnected()) { try { int result = socketChannel.write(buffer); if (result==0) { LogUtils.i(OpenLog,TAG,"write."+"服务器断开链接"); } else if (result<0) { LogUtils.e(OpenLog, TAG, "write." + "发送出错"); } else { subscriber.onNext(true); subscriber.onCompleted(); } } catch (Exception e) { LogUtils.i(OpenLog,TAG,"write."+e.getMessage()); subscriber.onNext(false); subscriber.onCompleted(); } } else { LogUtils.i(OpenLog,TAG,"write."+"close"); subscriber.onNext(false); subscriber.onCompleted(); } } } }) .subscribeOn(Schedulers.newThread()) .timeout(WRITE_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false)); } //类 private class ReadThread extends Thread { private String ip; private int port; ReadThread(String ip, int port) { this.ip = ip; this.port = port; } @Override public void run() { LogUtils.i(OpenLog,TAG,"ReadThread:"+"start"); while (isReadThreadAlive) { //连接 if (socketStatus == SocketStatus.DIS_CONNECT) { try { if (selectionKey != null) selectionKey.cancel(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); selector = Selector.open(); socketChannel.connect(new InetSocketAddress(ip, port)); selectionKey = socketChannel.register(selector, SelectionKey.OP_CONNECT); socketStatus = SocketStatus.CONNECTING; connectStatus.onNext(SocketStatus.CONNECTING); } catch (Exception e) { isReadThreadAlive = false; socketStatus = SocketStatus.DIS_CONNECT; connectStatus.onNext(SocketStatus.DIS_CONNECT); LogUtils.e(OpenLog, TAG, "ReadThread:init:" + e.getMessage()); if (socketReconnectCallback!=null) socketReconnectCallback.onFail("SocketConnectFail1"); } } //读取 else if (socketStatus == SocketStatus.CONNECTING || socketStatus == SocketStatus.CONNECTED) { try { selector.select(); Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); if (key.isConnectable()) { if (socketChannel.isConnectionPending()) { try { socketChannel.finishConnect(); socketStatus = SocketStatus.CONNECTED; connectStatus.onNext(SocketStatus.CONNECTED); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); if (socketReconnectCallback!=null) socketReconnectCallback.onSuccess(); } catch (Exception e) { isReadThreadAlive = false; socketStatus = SocketStatus.DIS_CONNECT; connectStatus.onNext(SocketStatus.DIS_CONNECT); LogUtils.e(OpenLog, TAG, "ReadThread:finish:" + e.getMessage()); if (socketReconnectCallback!=null) socketReconnectCallback.onFail("SocketConnectFail2"); } } } else if (key.isReadable()) { ByteBuffer buf = ByteBuffer.allocate(10000); int length = socketChannel.read(buf); if (length <= 0) { LogUtils.e(OpenLog, TAG, "服务器主动断开链接!"); } else { byte[] bytes = new byte[length]; for (int i = 0; i < length; i++) { bytes[i] = buf.get(i); } readSubject.onNext(bytes); } } } it.remove(); } catch (Exception e) { isReadThreadAlive = false; socketStatus = SocketStatus.DIS_CONNECT; connectStatus.onNext(SocketStatus.DIS_CONNECT); LogUtils.e(OpenLog, TAG, "ReadThread:read:" + e.getMessage()); if (socketReconnectCallback!=null) socketReconnectCallback.onFail("SocketConnectFail3"); } } } } } //枚举 && 接口 public enum SocketStatus { DIS_CONNECT, CONNECTING, CONNECTED, } private interface SocketReconnectCallback { void onSuccess(); void onFail(String msg); } }</code></pre> <p>额,好像有点长,这个Socket是NIO包的Socket,里面只是开启了一条线程。</p> <h3>4 .注意</h3> <ul> <li> <p>之所以放出一个监听方法,我想的是,Socket连接上后,有可能会被断开,</p> <p>这样,就需要做一个重连的策略,当然,这个策略看项目的要求,</p> <p>因而,我把其对外开放了。你可以监听这个方法,去做Socket的重连策略。</p> </li> <li> <p>RxSokcet的读方法,需要注意,要在适当的时候去解除订阅。</p> <p>还有,Socket状态的监听也是。</p> </li> <li> <p>最后,有哪些不合理的地方,各位大老要好好教导一下小弟~</p> </li> </ul> <p> </p> <p>来自:http://www.jianshu.com/p/27e4f714cfa3</p> <p> </p>