如何手写一个轻量级消息队列?

xjtuxjt 8年前
   <h2>写在前面</h2>    <p>最近因为项目需要,自己写了个单生产者-多消费者的消息队列模型。多线程真的不是等闲之辈能玩儿的,我花了两个小时进行设计与编码,却花了两天的时间调试与运行。在这里,我把我遇到的坑与大家分享。</p>    <h2>需求的由来</h2>    <p>一开始我需要实现一个记录用户操作日志的功能,目的是给商家用户提供客户行为分析的能力。要记录的信息包括客户的访问时间、IP、在网站上所做的操作等。其中,客户的地域信息是个重要的分析项,所以必须要把IP转化成省市县。那么究竟何时完成这个转化的动作呢?有两种方案:</p>    <p>1. 在用户进行数据分析 <strong>时</strong> 完成转化</p>    <p>2. 在用户进行数据分析 <strong>前</strong> 完成转化</p>    <p>第一种方案显然不靠谱,因为需要转化的IP数量很大,而且转化采用第三方接口,因此整个转化过程将持续很长很长很长……的时间。</p>    <p>而在分析前就把转化过程完成掉,这样当用户需要分析的时候就可以减少这部分时间的开销,提高了响应速度。因此第二种方案显然比较合理。</p>    <p>那么随之而来的问题是:究竟在数据分析前的哪个时机进行转化?</p>    <p>这个问题又有两种方案:</p>    <p>1. 在记录日志的时候就立即完成IP向省市县的转换;</p>    <p>2. 每天半夜服务器统一把当天的IP转化成省市县;</p>    <p>这两种方案应该来说各有千秋。</p>    <p>第一种方案比较消耗服务器资源,因为IP向省市县转化需要向第三方接口发送GET请求,因此需要消耗一定的出口带宽和内存资源,在服务器资源一定的前提下,分给用户访问的资源就会被减少,从而可能会影响请求响应速度。但这个问题可以用钱来解决,只要花钱砸服务器就行了;而第二种方案在服务器空闲的时候进行转化虽然节约了服务器资源,但这也导致了商家的分析结果会有一天的滞后,影响用户体验。</p>    <p>于是,这个问题就变成了老板的钱重要还是用户体验重要。因此我毫不犹豫地选择了第一种方案。</p>    <h2>初步设计</h2>    <p>我使用Servlet Filter拦截用户的所有请求,并在Filter中获取用户的各项信息(其中包括IP),然后再请求第三方接口,完成IP向省市县的转化,最后将这些信息入库。</p>    <p>这个流程很显然有重大缺陷:请求响应时间将被拉的很长。</p>    <p>因为Filter是同步的,只有当Filter中的任务完成后才会放行用户的请求,而这个Filter中有两处耗时操作:请求第三方接口、数据入库,这无疑增加了用户的等待时间。</p>    <p>因此,我需要将耗时操作异步执行,减少Filter的阻塞时间。</p>    <p>我把这两个耗时操作放入一个新线程中,只要请求一来,就创建一条新线程去处理这两步操作。和先前的方式比对之后发现,确实响应速度提高了不少!</p>    <p>但仔细一想,发现不妙。这种方式没办法控制线程的数量,当访问量很高的情况下,线程数量将会无限增加,这时候会搞垮服务器的!</p>    <p>所以需要一个机制来管理所有的线程,于是我就设计了一个消息队列模型。</p>    <h2>模型设计</h2>    <p style="text-align:center"><img src="https://simg.open-open.com/show/bd5e163706144dc0a22dde810082f2c9.png"></p>    <p>这个模型很简单,由一个任务队列和多个工作线程组成。生产者只需不停地往任务队列中添加任务,消费者(工作线程)不停地从任务队列的另一端取任务执行。</p>    <p>这个模型在项目中的应用是这样的:当一个请求被Filter拦截后,Filter从请求中获取用户的各项信息,然后把这些信息封装成一个任务对象,扔给任务队列,此刻这个Filter的使命就完成了,它完全不用管任务的执行过程。工作线程会不停地从任务队列中取任务执行。</p>    <h2>类图设计</h2>    <p style="text-align:center"><img src="https://simg.open-open.com/show/70d038ce65e318b02faf6c62e527b32b.png"></p>    <p>从代码层面来看,整个消息队列由三个类构成:</p>    <h2>消息队列类MsgQueue</h2>    <p>这个类管理整个消息队列的运行,是主控程序,它包含以下方法:</p>    <ul>     <li> <p>init:初始化整个消息队列</p> <p>在初始化过程中,它会依次做以下事情:</p>      <ol>       <li>创建一个任务队列</li>       <li>调用initWorkThread函数,创建指定数量的工作线程(工作线程一旦被创建,就会不停地读取任务队列中的任务)</li>       <li>调用loadTask函数,从数据库中加载所有任务</li>      </ol> </li>     <li> <p>loadTask:加载数据库中的所有任务</p> <p>这是一个抽象函数,若要使用这个消息队列,必须实现这个函数。</p> <p>消息队列初始化的时候会调用这个函数,从数据库中加载上次没有执行完的任务。</p> <p>作为消息队列来讲,它并不知道你提供的任务是啥,因此它没办法知道你的任务应该存在哪里,以何种形式存储?因此,这个过程就需要让消息队列使用者自己去实现。</p> </li>     <li> <p>saveTask:持久化当前任务队列中的任务</p> <p>这也是个抽象函数,若要使用这个消息队列,也必须实现这个函数。</p> <p>当使用者调用消息队列的stop函数时,它会被执行,用于存储当前消息队列中尚未被执行的任务,并且在下次启动消息队列的时候通过loadTask函数再次加载进任务队列,这样能确保所有任务不会被遗漏。</p> </li>     <li> <p>addTask:向任务队列添加一个任务</p> </li>     <li>stop:停止所有工作线程</li>     <li>initWorkThread:初始化所有工作线程<br> 这是一个私有函数,当初始化整个消息队列的时候被init函数调用。</li>    </ul>    <h2>工作线程类WorkThread</h2>    <p>工作线程会不断地检查任务队列中是否有任务,若有任务,就会取一个任务执行;若没有任务,就会等待一定时间后再次检查。</p>    <p>它是MsgQueue的一个内部类。因为WorkThread的行为完全由MsgQueue管理,外界不需要知道它的存在。</p>    <h2>任务类Task</h2>    <p>它是一个接口,并且只有一个函数run,用于封装任务具体的执行过程。</p>    <h2>附上代码</h2>    <p>以下代码还没将消息队列单独抽象出来,相当于是一个专门用于IP向省市县转化的消息队列,有空把它整一下。</p>    <p>代码中有详细的注释来解释线程安全性问题。</p>    <ul>     <li>消息队列主控程序</li>    </ul>    <pre>  <code class="language-java">package com.sdata.foundation.web.filter;    import java.util.ArrayList;  import java.util.Arrays;  import java.util.Date;  import java.util.List;  import java.util.Map;    import javax.servlet.FilterConfig;  import javax.servlet.ServletContext;    import org.apache.log4j.Logger;  import org.springframework.web.context.support.WebApplicationContextUtils;  import org.springframework.web.context.support.XmlWebApplicationContext;    import com.sdata.foundation.web.service.util.DelDataUtilService;  import com.sdata.foundation.web.service.util.InsertDataUtilService;  import com.sdata.foundation.web.service.util.QueryDataUtilService;  import com.thinkgem.jeesite.modules.sys.service.LogService;    /**   * 记录请求IP的消息队列   * @author Chai   *   */  public class RecordLocationMQ {      // 工作线程的个数      private static int MaxWorkThread;      // 工作线程队列      private static List<WorkThread> workThreadQueue = new ArrayList<WorkThread>();      // 任务队列(存放等待执行的任务)      private static List<RecordLocationTask> msgQueue = new ArrayList<RecordLocationTask>();      // 控制所有工作线程的运行与否      private static boolean isRunning = true;      private static LogService LogService;      // 数据库查询的service(用于任务的持久化)      private static QueryDataUtilService QueryService;      // 数据库删除的service(用于任务的持久化)      private static DelDataUtilService DelService;      // 数据库插入的service(用于任务的持久化)      private static InsertDataUtilService InsertService;      // 日志      private static final Logger logger = Logger.getLogger(RecordLocationMQ.class);        // 一些常量      private static final int SUCCESS = 1;      private static final int FAIL = 0;            /**       * 本消息队列的初始化函数       * @param config 用于获取数据库操作的service       */      public static void init (  FilterConfig config ) {          RecordLocationMQ.init( 10, config );      }          /**       * 本消息队列的初始化函数       * @param MaxWorkThread 工作线程的个数       * @param config 用于获取数据库操作的service       */      public static void init ( int MaxWorkThread, FilterConfig config ) {            RecordLocationMQ.MaxWorkThread = MaxWorkThread;            // 初始化LogService          if (null == RecordLocationMQ.LogService) {              ServletContext sc = config.getServletContext();              XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);              if (cxt != null && cxt.getBean("logService") != null) {                  RecordLocationMQ.LogService = (LogService) cxt.getBean("logService");              }          }            // 初始化QueryService          if (null == RecordLocationMQ.QueryService) {              ServletContext sc = config.getServletContext();              XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);              if (cxt != null && cxt.getBean("queryDataUtilService") != null) {                  RecordLocationMQ.QueryService = (QueryDataUtilService) cxt.getBean("queryDataUtilService");              }          }            // 初始化DelService          if (null == RecordLocationMQ.DelService) {              ServletContext sc = config.getServletContext();              XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);              if (cxt != null && cxt.getBean("delDataUtilService") != null) {                  RecordLocationMQ.DelService = (DelDataUtilService) cxt.getBean("delDataUtilService");              }          }            // 初始化InsertService          if (null == RecordLocationMQ.InsertService) {              ServletContext sc = config.getServletContext();              XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);              if (cxt != null && cxt.getBean("insertDataUtilService") != null) {                  RecordLocationMQ.InsertService = (InsertDataUtilService) cxt.getBean("insertDataUtilService");              }          }            // 从DB中加载尚未完成的任务          // PS:在新线程中执行,防止tomcat启动时间过长          new RecordLocationMQ().new loadTaskThread().start();            // 初始化工作线程,并开始工作          initWorkThread( MaxWorkThread, workThreadQueue );      }          /**       * 初始化工作线程,并开始工作       * @param maxWorkThread 工作线程数量       * @param workThreadQueue 工作线程队列       */      private static void initWorkThread(int maxWorkThread, List<WorkThread> workThreadQueue) {          for ( int i=0; i<maxWorkThread; i++ ) {              WorkThread workThread = new RecordLocationMQ().new WorkThread("WorkThread"+(i+1));              workThreadQueue.add( workThread );              workThread.start();              System.out.println("已开启线程:WorkThread"+(i+1));          }      }          /**       * 从DB中加载尚未完成的任务       * 并插入传入的消息队列中       * @param msgQueue       * @param logger        * @param logService        */      private static void loadTask ( List<RecordLocationTask> msgQueue, QueryDataUtilService QueryService, DelDataUtilService DelService ) {            String querySQL = "select * from sys_log_temp";          String delSQL = "delete from sys_log_temp";            // 查询DB中的任务          try {              List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );              for ( Map<String,Object> map : queryResultList ) {                    String ip = map.get("ip").toString();                  String logId = map.get("log_id").toString();                    if ( null!=ip && null!=logId ) {                      RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );                  }              }          }           // 查询失败,不能执行delte操作          catch (Exception e) {              e.printStackTrace();              return;          }            // 清空DB中的任务          DelService.del( Arrays.asList( delSQL ) );      }          /**       * 持久化当前任务队列       */      public static void saveTask () {          // PS1:为什么要使用同步?          //    Java不允许在遍历集合过程中新增/删除元素,           //    因此在遍历任务队列前必须要先冻结任务队列,          //    防止其他线程新增/删除元素;          //    此外,为了冻结任务队列,就必须使用msgQueue锁。          // PS2:为何要先使isRunning为false?          //     将isRunning设为false能立即停止所有工作线程(见PS3),          //     从而所有工作线程都将释放msgQueue锁,          //     从而确保这里的同步块能顺利拿到msgQueue锁。          //     若不执行isRunning = false的话,          //     所有工作线程就会继续执行,          //     如果任务队列为空,工作线程就会一直持有msgQueue锁,并等待任务的到来,          //     然而添加任务的功能得在当前同步块执行完成后才会执行,          //     因此就出现了死锁。          // PS3:为何将isRunning设为false就能立即停止所有工作线程?          //     因为isRunning为工作线程的共享资源,          //     并且工作线程的运行依赖于它的值;          //     因此当isRunning设为false后,          //     工作线程执行完当前任务或发现任务队列为空后,就会纷纷停止。          isRunning = false;          synchronized ( msgQueue ) {              for ( RecordLocationTask task : msgQueue ) {                  if ( task!=null && !task.persisted ) {                      int result = InsertService.insert( Arrays.asList( "insert into sys_log_temp (id, ip, log_id) values('"+new Date().getTime()+"', '"+task.getIp()+"', '"+task.getLogId()+"')" ) );                      if ( result == SUCCESS ) {                          task.persisted = true;                      }                  }              }              isRunning = true;          }      }        /**       * 向任务队列中添加一个任务       * @param task 任务对象       */      public static void addTask ( RecordLocationTask task ) {            // 添加任务          // PS:加判断的原因:由于当前这个类的这个函数是提供给别人使用的,          //    我们没办法保证别人一定会传入一个非空的task,          //    因此加个判断能提高程序的健壮性。          if ( task!=null ) {              // PS1:加同步块的原因:由于msgQueue是ArrayList类型,              //     ArrayList所有函数都是线程不安全的,              //     这里加一个同步块使add函数具有原子性。              // PS2:千万不能使用msgQueue作为锁!              //     因为工作线程获取一个任务的过程,使用的锁就是msgQueue,              //     并且在这个过程中,如果任务队列为空就会一直循环等待,              //     因此在等待的过程中工作线程就一直占用的msgQueue锁;              //     然而如果这里添加任务还需要msgQueue锁,那么就会出现死锁,              //     工作线程因为任务队列为空就一直占用着msgQueue锁,              //     而添加任务的进程获取不到msgQueue锁就无法添加任务。              synchronized ( new Object() ) {                  msgQueue.add(task);                  // System.out.println("向消息队列添加了一条task!");              }          }            // 持久化任务队列          // PS:不使用同步的原因:这里对于数据的实时性要求没那么高。          if ( msgQueue.size() > 100 ) {               saveTask();          }      }          public static void stop () {          isRunning = false;      }          /**       * 工作线程内部类       */      private class WorkThread extends Thread {            public WorkThread ( String threadName ) {              super(threadName);          }            @Override          public void run() {              RecordLocationTask task = null;                while ( isRunning ) {                  // 获取一个任务                  synchronized ( msgQueue ) {                      // 任务队列为空,则等待                      while ( isRunning && msgQueue.isEmpty() ) {                          // System.out.println("消息队列为空!");                          try {                              Thread.sleep(200);                          } catch (InterruptedException e) {                              e.printStackTrace();                          }                      }                        // 取一个任务                      // PS:加判断的原因:上述while循环的结束有两种可能:                      //              1.msgQueue不为空;                      //              2.isRunning为false                      // 因此要加判断排除msgQueue为空,但isRunning为false的情况,                      // 防止msgQueue.remove时出现空指针!                      if ( !msgQueue.isEmpty() ) {                          task = msgQueue.remove(0);                      }                      // System.out.println(this.getName() + "取了一个task!");                  }                    // 执行任务                  // PS1:加try-catch的原因:捕获任务执行过程中发生的一切异常,                    //   只要发生异常,就说明该任务执行失败,                  //   因此需要把它重新放进任务队列等待下一次执行。                  // PS2:加判断的原因:同上述“取一个任务”加判断的原因一样。                  try {                      if ( task!=null ) {                          task.run();                      }                  } catch (Exception e) {                      // e.printStackTrace();                      RecordLocationMQ.addTask( task ); // 使用addTask函数添加,统一添加的入口                  }              }          }      }        /**       * 从数据库加载任务的内部类       */      private class loadTaskThread extends Thread {            @Override          public void run() {                String querySQL = "select * from sys_log_temp";              String delSQL = "delete from sys_log_temp";                // 查询DB中的任务              try {                  List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );                  for ( Map<String,Object> map : queryResultList ) {                        String ip = map.get("ip").toString();                      String logId = map.get("log_id").toString();                        if ( null!=ip && null!=logId ) {                          RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );                      }                  }              }               // 查询失败,不能执行delte操作              catch (Exception e) {                  e.printStackTrace();                  return;              }                // 清空DB中的任务              DelService.del( Arrays.asList( delSQL ) );            }      }        // 禁用构造函数      private RecordLocationMQ () {}  }</code></pre>    <ul>     <li>任务接口</li>    </ul>    <pre>  <code class="language-java">package com.sdata.foundation.web.filter;    public interface Task {      public void run() throws Exception;  }</code></pre>    <ul>     <li>用于IP向省市县转化的任务线程</li>    </ul>    <pre>  <code class="language-java">package com.sdata.foundation.web.filter;    import java.util.Date;    import org.apache.log4j.Logger;    import com.thinkgem.jeesite.common.utils.IdGen;  import com.thinkgem.jeesite.modules.sys.entity.Log;  import com.thinkgem.jeesite.modules.sys.service.LogService;    public class RecordLocationTask implements Task {      private static final Logger logger = Logger.getLogger(RecordLocationTask.class);      private LogService logService;      private String ip;      private String logId;      //       public boolean persisted = false;        public RecordLocationTask(String ip, String logId, LogService logService ) {          super();          this.logService = logService;          this.ip = ip;          this.logId = logId;      }        @Override      public void run() throws Exception {            // 查询IP          if ( (new Date().getTime() - TransferIPTool.lastOperaTime) < 100 ){              try {                  Thread.sleep(100);              } catch (InterruptedException e) {                  e.printStackTrace();              }          }            String location = TransferIPTool.transferIP(ip);            // 更新log          Log log = new Log();          log.setIsNewRecord(false);          log.setId(logId);          log.setLocation(location);          logService.save(log);            System.out.println("完成一个task!");        }        public String getIp() {          return ip;      }        public String getLogId() {          return logId;      }    }</code></pre>    <ul>     <li>用于IP向省市县转化的工具类</li>    </ul>    <pre>  <code class="language-java">package com.sdata.foundation.web.filter;    import java.util.Arrays;  import java.util.Collections;  import java.util.Date;  import java.util.List;    import com.alibaba.fastjson.JSONObject;  import com.sdata.foundation.web.service.util.HTTPHelper;    /**   *    * @author Chai   * 本类用于将IP转化为位置信息   */  public class TransferIPTool {        // 本次请求taobao接口的开始时间      public static long lastOperaTime = new Date().getTime();      // 模拟IP队列      private static List<String> IPList = Arrays.asList("49.65.250.135","115.28.217.42","114.80.166.240","122.92.218.0","218.28.191.23","218.12.41.179","221.239.16.227","59.108.49.35","124.117.66.0","218.21.128.31","116.52.147.50");      // IP转换接口      private static final String RequestIP = "http://ip.taobao.com/service/getIpInfo.php";          /**       * 将IP转化为省份       * @param ip       * @return 省份字符串       * @throws Exception        */      public static String transferIP ( String ip ) throws Exception {            // 记录本次请求taobao接口的开始时间          TransferIPTool.lastOperaTime = new Date().getTime();            // 打乱IPList          Collections.shuffle( IPList );            String resultJsonStr = HTTPHelper.executeGet(RequestIP + "?ip=" + IPList.get(0));          JSONObject resultJsonObj = JSONObject.parseObject( resultJsonStr );          JSONObject data = resultJsonObj.getJSONObject("data");          return data.getString("region");        }    }</code></pre>    <p> </p>    <p>来自:http://blog.csdn.net/u010425776/article/details/53837721</p>    <p> </p>