Netty实现服务端客户端长连接通讯及心跳检测

pdkie1 10年前

      通过netty实现服务端与客户端的长连接通讯,及心跳检测。

       基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

        环境JDK1.8 和netty5

        以下是具体的代码实现和介绍:

1公共的Share部分(主要包含消息协议类型的定义)

     设计消息类型:

public enum  MsgType {      PING,ASK,REPLY,LOGIN  }

    Message基类:

//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!  public abstract class BaseMsg  implements Serializable {      private static final long serialVersionUID = 1L;      private MsgType type;      //必须唯一,否者会出现channel调用混乱      private String clientId;        //初始化客户端id      public BaseMsg() {          this.clientId = Constants.getClientId();      }        public String getClientId() {          return clientId;      }        public void setClientId(String clientId) {          this.clientId = clientId;      }        public MsgType getType() {          return type;      }        public void setType(MsgType type) {          this.type = type;      }  }

常量设置:

public class Constants {      private static String clientId;      public static String getClientId() {          return clientId;      }      public static void setClientId(String clientId) {          Constants.clientId = clientId;      }  }

登录类型消息:

public class LoginMsg extends BaseMsg {      private String userName;      private String password;      public LoginMsg() {          super();          setType(MsgType.LOGIN);      }        public String getUserName() {          return userName;      }        public void setUserName(String userName) {          this.userName = userName;      }        public String getPassword() {          return password;      }        public void setPassword(String password) {          this.password = password;      }  }

 心跳检测Ping类型消息:

public class PingMsg extends BaseMsg {      public PingMsg() {          super();          setType(MsgType.PING);      }  }

请求类型消息:

public class AskMsg extends BaseMsg {      public AskMsg() {          super();          setType(MsgType.ASK);      }      private AskParams params;        public AskParams getParams() {          return params;      }        public void setParams(AskParams params) {          this.params = params;      }  }  //请求类型参数  //必须实现序列化接口  public class AskParams implements Serializable {      private static final long serialVersionUID = 1L;      private String auth;        public String getAuth() {          return auth;      }        public void setAuth(String auth) {          this.auth = auth;      }  }

响应类型消息:

public class ReplyMsg extends BaseMsg {      public ReplyMsg() {          super();          setType(MsgType.REPLY);      }      private ReplyBody body;        public ReplyBody getBody() {          return body;      }        public void setBody(ReplyBody body) {          this.body = body;      }  }  //相应类型body对像  public class ReplyBody implements Serializable {      private static final long serialVersionUID = 1L;  }  public class ReplyClientBody extends ReplyBody {      private String clientInfo;        public ReplyClientBody(String clientInfo) {          this.clientInfo = clientInfo;      }        public String getClientInfo() {          return clientInfo;      }        public void setClientInfo(String clientInfo) {          this.clientInfo = clientInfo;      }  }  public class ReplyServerBody extends ReplyBody {      private String serverInfo;      public ReplyServerBody(String serverInfo) {          this.serverInfo = serverInfo;      }      public String getServerInfo() {          return serverInfo;      }      public void setServerInfo(String serverInfo) {          this.serverInfo = serverInfo;      }  }

2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

Map:

public class NettyChannelMap {      private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();      public static void add(String clientId,SocketChannel socketChannel){          map.put(clientId,socketChannel);      }      public static Channel get(String clientId){         return map.get(clientId);      }      public static void remove(SocketChannel socketChannel){          for (Map.Entry entry:map.entrySet()){              if (entry.getValue()==socketChannel){                  map.remove(entry.getKey());              }          }      }    }

Handler

public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {      @Override      public void channelInactive(ChannelHandlerContext ctx) throws Exception {          //channel失效,从Map中移除          NettyChannelMap.remove((SocketChannel)ctx.channel());      }      @Override      protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {            if(MsgType.LOGIN.equals(baseMsg.getType())){              LoginMsg loginMsg=(LoginMsg)baseMsg;              if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){                  //登录成功,把channel存到服务端的map中                  NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());                  System.out.println("client"+loginMsg.getClientId()+" 登录成功");              }          }else{              if(NettyChannelMap.get(baseMsg.getClientId())==null){                      //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录                      LoginMsg loginMsg=new LoginMsg();                      channelHandlerContext.channel().writeAndFlush(loginMsg);              }          }          switch (baseMsg.getType()){              case PING:{                  PingMsg pingMsg=(PingMsg)baseMsg;                  PingMsg replyPing=new PingMsg();                  NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);              }break;              case ASK:{                  //收到客户端的请求                  AskMsg askMsg=(AskMsg)baseMsg;                  if("authToken".equals(askMsg.getParams().getAuth())){                      ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");                      ReplyMsg replyMsg=new ReplyMsg();                      replyMsg.setBody(replyBody);                      NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);                  }              }break;              case REPLY:{                  //收到客户端回复                  ReplyMsg replyMsg=(ReplyMsg)baseMsg;                  ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();                  System.out.println("receive client msg: "+clientBody.getClientInfo());              }break;              default:break;          }          ReferenceCountUtil.release(baseMsg);      }  }

ServerBootstrap:

public class NettyServerBootstrap {      private int port;      private SocketChannel socketChannel;      public NettyServerBootstrap(int port) throws InterruptedException {          this.port = port;          bind();      }        private void bind() throws InterruptedException {          EventLoopGroup boss=new NioEventLoopGroup();          EventLoopGroup worker=new NioEventLoopGroup();          ServerBootstrap bootstrap=new ServerBootstrap();          bootstrap.group(boss,worker);          bootstrap.channel(NioServerSocketChannel.class);          bootstrap.option(ChannelOption.SO_BACKLOG, 128);          //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去          bootstrap.option(ChannelOption.TCP_NODELAY, true);          //保持长连接状态          bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);          bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {              @Override              protected void initChannel(SocketChannel socketChannel) throws Exception {                  ChannelPipeline p = socketChannel.pipeline();                  p.addLast(new ObjectEncoder());                  p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));                  p.addLast(new NettyServerHandler());              }          });          ChannelFuture f= bootstrap.bind(port).sync();          if(f.isSuccess()){              System.out.println("server start---------------");          }      }      public static void main(String []args) throws InterruptedException {          NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);          while (true){              SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");              if(channel!=null){                  AskMsg askMsg=new AskMsg();                  channel.writeAndFlush(askMsg);              }              TimeUnit.SECONDS.sleep(5);          }      }  }

3 Client端:包含发起登录,发送心跳,及对应消息处理

handler

public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {      //利用写空闲发送心跳检测消息      @Override      public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {          if (evt instanceof IdleStateEvent) {              IdleStateEvent e = (IdleStateEvent) evt;              switch (e.state()) {                  case WRITER_IDLE:                      PingMsg pingMsg=new PingMsg();                      ctx.writeAndFlush(pingMsg);                      System.out.println("send ping to server----------");                      break;                  default:                      break;              }          }      }      @Override      protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {          MsgType msgType=baseMsg.getType();          switch (msgType){              case LOGIN:{                  //向服务器发起登录                  LoginMsg loginMsg=new LoginMsg();                  loginMsg.setPassword("yao");                  loginMsg.setUserName("robin");                  channelHandlerContext.writeAndFlush(loginMsg);              }break;              case PING:{                  System.out.println("receive ping from server----------");              }break;              case ASK:{                  ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!");                  ReplyMsg replyMsg=new ReplyMsg();                  replyMsg.setBody(replyClientBody);                  channelHandlerContext.writeAndFlush(replyMsg);              }break;              case REPLY:{                  ReplyMsg replyMsg=(ReplyMsg)baseMsg;                  ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();                  System.out.println("receive client msg: "+replyServerBody.getServerInfo());              }              default:break;          }          ReferenceCountUtil.release(msgType);      }  }

bootstrap

public class NettyClientBootstrap {      private int port;      private String host;      private SocketChannel socketChannel;      private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);      public NettyClientBootstrap(int port, String host) throws InterruptedException {          this.port = port;          this.host = host;          start();      }      private void start() throws InterruptedException {          EventLoopGroup eventLoopGroup=new NioEventLoopGroup();          Bootstrap bootstrap=new Bootstrap();          bootstrap.channel(NioSocketChannel.class);          bootstrap.option(ChannelOption.SO_KEEPALIVE,true);          bootstrap.group(eventLoopGroup);          bootstrap.remoteAddress(host,port);          bootstrap.handler(new ChannelInitializer<SocketChannel>() {              @Override              protected void initChannel(SocketChannel socketChannel) throws Exception {                  socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));                  socketChannel.pipeline().addLast(new ObjectEncoder());                  socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));                  socketChannel.pipeline().addLast(new NettyClientHandler());              }          });          ChannelFuture future =bootstrap.connect(host,port).sync();          if (future.isSuccess()) {              socketChannel = (SocketChannel)future.channel();              System.out.println("connect server  成功---------");          }      }      public static void main(String[]args) throws InterruptedException {          Constants.setClientId("001");          NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");            LoginMsg loginMsg=new LoginMsg();          loginMsg.setPassword("yao");          loginMsg.setUserName("robin");          bootstrap.socketChannel.writeAndFlush(loginMsg);          while (true){              TimeUnit.SECONDS.sleep(3);              AskMsg askMsg=new AskMsg();              AskParams askParams=new AskParams();              askParams.setAuth("authToken");              askMsg.setParams(askParams);              bootstrap.socketChannel.writeAndFlush(askMsg);          }      }  }

具体的例子和相应pom.xml 见 https://github.com/WangErXiao/ServerClient

转发请注明来源:http://my.oschina.net/robinyao/blog/399060