把Socket撸成RxSocket

dmsa4941 8年前
   <h3>0 .概述</h3>    <p>最近,公司的项目需要重构代码,因为个人最近接触了Rx这个高大上的东东,</p>    <p>而,项目原来是满满的回调地狱...项目是基于Socket的长连接,所以呢,</p>    <p>从最底层开始,把普通的Socket撸成Rx!</p>    <h3>1 .设计模式</h3>    <p>首先,这个RxSocket是唯一的,也就是,全局唯一咯,</p>    <p>嗯,设计成单例。</p>    <pre>  <code class="language-java">public static RxSocket getInstance() {          RxSocket rxSocket = defaultInstance;          if (defaultInstance == null) {              synchronized (RxSocket.class) {                  rxSocket = defaultInstance;                  if (defaultInstance == null) {                      rxSocket = new RxSocket();                      defaultInstance = rxSocket;                  }              }          }          return rxSocket;      }</code></pre>    <p>双重加锁型的单例。</p>    <h3>2 .对外的接口/方法</h3>    <p>Socket,第一个想到就是连接,读写,而,我们外界想知道的,就只是是否</p>    <p>写,连接是否成功,和读到啥数据。所以定义:</p>    <pre>  <code class="language-java">public Observable<Boolean> connectRx(String ip, int port);  public Observable<Boolean> disConnect();  public Observable<byte[]> read();  public Observable<Boolean> write(ByteBuffer buffer);</code></pre>    <p>还有一点,应该要有一个方法,让外界知道,这个Socket的状态,也就是监听方法:</p>    <p>public Observable<SocketStatus> socketStatusListener ();</p>    <h3>3 .具体代码实现</h3>    <pre>  <code class="language-java">package chestnut.RxSocket;    import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.SocketChannel;  import java.util.Iterator;  import java.util.concurrent.TimeUnit;    import chestnut.utils.LogUtils;  import rx.Observable;  import rx.Subscriber;  import rx.schedulers.Schedulers;  import rx.subjects.PublishSubject;  import rx.subjects.SerializedSubject;  import rx.subjects.Subject;    /**   * Created by Chestnut on 2016/12/18.   */    public class RxSocket {        //本类的常量      private static final String TAG = "RxSocket";      private static final boolean OpenLog = true;      private static final long WRITE_TIME_OUT = 3000;      private static final long CONNECT_TIME_OUT = 3000;        //单例      private Subject<Object,byte[]> readSubject;      private Subject<Object,SocketStatus> connectStatus;      private static volatile RxSocket defaultInstance;      private RxSocket() {          readSubject = new SerializedSubject(PublishSubject.create());          connectStatus = new SerializedSubject(PublishSubject.create());      }      public static RxSocket getInstance() {          RxSocket rxSocket = defaultInstance;          if (defaultInstance == null) {              synchronized (RxSocket.class) {                  rxSocket = defaultInstance;                  if (defaultInstance == null) {                      rxSocket = new RxSocket();                      defaultInstance = rxSocket;                  }              }          }          return rxSocket;      }        //变量      private SocketStatus socketStatus = SocketStatus.DIS_CONNECT;      private Selector selector = null;      private SocketChannel socketChannel = null;      private SelectionKey selectionKey = null;      private ReadThread readThread = null;      private boolean isReadThreadAlive = true;      private SocketReconnectCallback socketReconnectCallback = null;        //方法        /**       * 监听Socket的状态       * @return       */      public Observable<SocketStatus> socketStatusListener () {          return connectStatus;      }        /**       * 建立Socket连接,只是尝试建立一次       * @param ip    IP or 域名       * @param port  端口       * @return  Rx true or false       */      public Observable<Boolean> connectRx(String ip, int port) {          return Observable                  .create(new Observable.OnSubscribe<Boolean>() {                      @Override                      public void call(Subscriber<? super Boolean> subscriber) {                            //正在连接                          if (socketStatus == SocketStatus.CONNECTING) {                              subscriber.onNext(false);                              subscriber.onCompleted();                              return;                          }                            //未连接 | 已经连接,关闭Socket                          socketStatus = SocketStatus.DIS_CONNECT;                          isReadThreadAlive = false;                          readThread = null;                          if (selector!=null)                              try {                                  selector.close();                              } catch (Exception e) {                                  LogUtils.i(OpenLog,TAG,"selector.close");                              }                          if (selectionKey!=null)                              try {                                  selectionKey.cancel();                              } catch (Exception e) {                                  LogUtils.i(OpenLog,TAG,"selectionKey.cancel");                              }                          if (socketChannel!=null)                              try {                                  socketChannel.close();                              } catch (Exception e) {                                  LogUtils.i(OpenLog,TAG,"socketChannel.close");                              }                            //重启Socket                          isReadThreadAlive = true;                          readThread = new ReadThread(ip,port);                          readThread.start();                          socketReconnectCallback = new SocketReconnectCallback() {                              @Override                              public void onSuccess() {                                  subscriber.onNext(true);                                  subscriber.onCompleted();                              }                                @Override                              public void onFail(String msg) {                                  LogUtils.i(OpenLog,TAG,"connectRx:"+msg);                                  subscriber.onNext(false);                                  subscriber.onCompleted();                              }                          };                      }                  })                  .subscribeOn(Schedulers.newThread())                  .timeout(CONNECT_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false));      }        /**       * 断开当前的Socket       * @return Rx true or false       */      public Observable<Boolean> disConnect() {          return Observable.create(new Observable.OnSubscribe<Boolean>() {              @Override              public void call(Subscriber<? super Boolean> subscriber) {                  try {                      if (socketStatus == SocketStatus.DIS_CONNECT) {                          subscriber.onNext(true);                          subscriber.onCompleted();                      }                      else {                          socketStatus = SocketStatus.DIS_CONNECT;                          isReadThreadAlive = false;                          readThread = null;                          if (selector!=null)                              try {                                  selector.close();                              } catch (Exception e) {                                  LogUtils.i(OpenLog,TAG,"selector.close");                              }                          if (selectionKey!=null)                              try {                                  selectionKey.cancel();                              } catch (Exception e) {                                  LogUtils.i(OpenLog,TAG,"selectionKey.cancel");                              }                          if (socketChannel!=null)                              try {                                  socketChannel.close();                              } catch (Exception e) {                                  LogUtils.i(OpenLog,TAG,"socketChannel.close");                              }                          subscriber.onNext(true);                          subscriber.onCompleted();                      }                  } catch (Exception e) {                      subscriber.onNext(false);                      subscriber.onCompleted();                  }              }          });      }        /**       * 读取Socket的消息       * @return  Rx error 或者 有数据       */      public Observable<byte[]> read() {          if (socketStatus != SocketStatus.CONNECTED)              return Observable.create(new Observable.OnSubscribe<byte[]>() {                  @Override                  public void call(Subscriber<? super byte[]> subscriber) {                      subscriber.onError(new Throwable("Socket Dis Connect"));                  }              });          else              return readSubject;      }        /**       * 向Socket写消息       * @param buffer    数据包       * @return  Rx true or false       */      public Observable<Boolean> write(ByteBuffer buffer) {          return Observable                  .create(new Observable.OnSubscribe<Boolean>() {                      @Override                      public void call(Subscriber<? super Boolean> subscriber) {                          if (socketStatus != SocketStatus.CONNECTED) {                              subscriber.onNext(false);                              subscriber.onCompleted();                          }                          else {                              if (socketChannel!=null && socketChannel.isConnected()) {                                  try {                                      int result = socketChannel.write(buffer);                                      if (result==0) {                                          LogUtils.i(OpenLog,TAG,"write."+"服务器断开链接");                                      }                                      else if (result<0) {                                          LogUtils.e(OpenLog, TAG, "write." + "发送出错");                                      }                                      else {                                          subscriber.onNext(true);                                          subscriber.onCompleted();                                      }                                  } catch (Exception e) {                                      LogUtils.i(OpenLog,TAG,"write."+e.getMessage());                                      subscriber.onNext(false);                                      subscriber.onCompleted();                                  }                              }                              else {                                  LogUtils.i(OpenLog,TAG,"write."+"close");                                  subscriber.onNext(false);                                  subscriber.onCompleted();                              }                          }                      }                  })                  .subscribeOn(Schedulers.newThread())                  .timeout(WRITE_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false));      }        //类      private class ReadThread extends Thread {          private String ip;          private int port;          ReadThread(String ip, int port) {              this.ip = ip;              this.port = port;          }          @Override          public void run() {              LogUtils.i(OpenLog,TAG,"ReadThread:"+"start");              while (isReadThreadAlive) {                  //连接                  if (socketStatus == SocketStatus.DIS_CONNECT) {                      try {                          if (selectionKey != null) selectionKey.cancel();                          socketChannel = SocketChannel.open();                          socketChannel.configureBlocking(false);                          selector = Selector.open();                          socketChannel.connect(new InetSocketAddress(ip, port));                          selectionKey = socketChannel.register(selector, SelectionKey.OP_CONNECT);                          socketStatus = SocketStatus.CONNECTING;                          connectStatus.onNext(SocketStatus.CONNECTING);                      } catch (Exception e) {                          isReadThreadAlive = false;                          socketStatus = SocketStatus.DIS_CONNECT;                          connectStatus.onNext(SocketStatus.DIS_CONNECT);                          LogUtils.e(OpenLog, TAG, "ReadThread:init:" + e.getMessage());                          if (socketReconnectCallback!=null)                              socketReconnectCallback.onFail("SocketConnectFail1");                      }                  }                  //读取                  else if (socketStatus == SocketStatus.CONNECTING || socketStatus  == SocketStatus.CONNECTED) {                      try {                          selector.select();                          Iterator<SelectionKey> it = selector.selectedKeys().iterator();                          while (it.hasNext()) {                              SelectionKey key = it.next();                              if (key.isConnectable()) {                                  if (socketChannel.isConnectionPending()) {                                      try {                                          socketChannel.finishConnect();                                          socketStatus = SocketStatus.CONNECTED;                                          connectStatus.onNext(SocketStatus.CONNECTED);                                          socketChannel.configureBlocking(false);                                          socketChannel.register(selector, SelectionKey.OP_READ);                                          if (socketReconnectCallback!=null)                                              socketReconnectCallback.onSuccess();                                      } catch (Exception e) {                                          isReadThreadAlive = false;                                          socketStatus = SocketStatus.DIS_CONNECT;                                          connectStatus.onNext(SocketStatus.DIS_CONNECT);                                          LogUtils.e(OpenLog, TAG, "ReadThread:finish:" + e.getMessage());                                          if (socketReconnectCallback!=null)                                              socketReconnectCallback.onFail("SocketConnectFail2");                                      }                                  }                              } else if (key.isReadable()) {                                  ByteBuffer buf = ByteBuffer.allocate(10000);                                  int length = socketChannel.read(buf);                                  if (length <= 0) {                                      LogUtils.e(OpenLog, TAG, "服务器主动断开链接!");                                  } else {                                      byte[] bytes = new byte[length];                                      for (int i = 0; i < length; i++) {                                          bytes[i] = buf.get(i);                                      }                                      readSubject.onNext(bytes);                                  }                              }                          }                          it.remove();                      } catch (Exception e) {                          isReadThreadAlive = false;                          socketStatus = SocketStatus.DIS_CONNECT;                          connectStatus.onNext(SocketStatus.DIS_CONNECT);                          LogUtils.e(OpenLog, TAG, "ReadThread:read:" + e.getMessage());                          if (socketReconnectCallback!=null)                              socketReconnectCallback.onFail("SocketConnectFail3");                      }                  }              }          }      }        //枚举 && 接口      public enum SocketStatus {          DIS_CONNECT,          CONNECTING,          CONNECTED,      }        private interface SocketReconnectCallback {          void onSuccess();          void onFail(String msg);      }    }</code></pre>    <p>额,好像有点长,这个Socket是NIO包的Socket,里面只是开启了一条线程。</p>    <h3>4 .注意</h3>    <ul>     <li> <p>之所以放出一个监听方法,我想的是,Socket连接上后,有可能会被断开,</p> <p>这样,就需要做一个重连的策略,当然,这个策略看项目的要求,</p> <p>因而,我把其对外开放了。你可以监听这个方法,去做Socket的重连策略。</p> </li>     <li> <p>RxSokcet的读方法,需要注意,要在适当的时候去解除订阅。</p> <p>还有,Socket状态的监听也是。</p> </li>     <li> <p>最后,有哪些不合理的地方,各位大老要好好教导一下小弟~</p> </li>    </ul>    <p> </p>    <p>来自:http://www.jianshu.com/p/27e4f714cfa3</p>    <p> </p>