如何手写一个轻量级消息队列?
xjtuxjt
8年前
<h2>写在前面</h2> <p>最近因为项目需要,自己写了个单生产者-多消费者的消息队列模型。多线程真的不是等闲之辈能玩儿的,我花了两个小时进行设计与编码,却花了两天的时间调试与运行。在这里,我把我遇到的坑与大家分享。</p> <h2>需求的由来</h2> <p>一开始我需要实现一个记录用户操作日志的功能,目的是给商家用户提供客户行为分析的能力。要记录的信息包括客户的访问时间、IP、在网站上所做的操作等。其中,客户的地域信息是个重要的分析项,所以必须要把IP转化成省市县。那么究竟何时完成这个转化的动作呢?有两种方案:</p> <p>1. 在用户进行数据分析 <strong>时</strong> 完成转化</p> <p>2. 在用户进行数据分析 <strong>前</strong> 完成转化</p> <p>第一种方案显然不靠谱,因为需要转化的IP数量很大,而且转化采用第三方接口,因此整个转化过程将持续很长很长很长……的时间。</p> <p>而在分析前就把转化过程完成掉,这样当用户需要分析的时候就可以减少这部分时间的开销,提高了响应速度。因此第二种方案显然比较合理。</p> <p>那么随之而来的问题是:究竟在数据分析前的哪个时机进行转化?</p> <p>这个问题又有两种方案:</p> <p>1. 在记录日志的时候就立即完成IP向省市县的转换;</p> <p>2. 每天半夜服务器统一把当天的IP转化成省市县;</p> <p>这两种方案应该来说各有千秋。</p> <p>第一种方案比较消耗服务器资源,因为IP向省市县转化需要向第三方接口发送GET请求,因此需要消耗一定的出口带宽和内存资源,在服务器资源一定的前提下,分给用户访问的资源就会被减少,从而可能会影响请求响应速度。但这个问题可以用钱来解决,只要花钱砸服务器就行了;而第二种方案在服务器空闲的时候进行转化虽然节约了服务器资源,但这也导致了商家的分析结果会有一天的滞后,影响用户体验。</p> <p>于是,这个问题就变成了老板的钱重要还是用户体验重要。因此我毫不犹豫地选择了第一种方案。</p> <h2>初步设计</h2> <p>我使用Servlet Filter拦截用户的所有请求,并在Filter中获取用户的各项信息(其中包括IP),然后再请求第三方接口,完成IP向省市县的转化,最后将这些信息入库。</p> <p>这个流程很显然有重大缺陷:请求响应时间将被拉的很长。</p> <p>因为Filter是同步的,只有当Filter中的任务完成后才会放行用户的请求,而这个Filter中有两处耗时操作:请求第三方接口、数据入库,这无疑增加了用户的等待时间。</p> <p>因此,我需要将耗时操作异步执行,减少Filter的阻塞时间。</p> <p>我把这两个耗时操作放入一个新线程中,只要请求一来,就创建一条新线程去处理这两步操作。和先前的方式比对之后发现,确实响应速度提高了不少!</p> <p>但仔细一想,发现不妙。这种方式没办法控制线程的数量,当访问量很高的情况下,线程数量将会无限增加,这时候会搞垮服务器的!</p> <p>所以需要一个机制来管理所有的线程,于是我就设计了一个消息队列模型。</p> <h2>模型设计</h2> <p style="text-align:center"><img src="https://simg.open-open.com/show/bd5e163706144dc0a22dde810082f2c9.png"></p> <p>这个模型很简单,由一个任务队列和多个工作线程组成。生产者只需不停地往任务队列中添加任务,消费者(工作线程)不停地从任务队列的另一端取任务执行。</p> <p>这个模型在项目中的应用是这样的:当一个请求被Filter拦截后,Filter从请求中获取用户的各项信息,然后把这些信息封装成一个任务对象,扔给任务队列,此刻这个Filter的使命就完成了,它完全不用管任务的执行过程。工作线程会不停地从任务队列中取任务执行。</p> <h2>类图设计</h2> <p style="text-align:center"><img src="https://simg.open-open.com/show/70d038ce65e318b02faf6c62e527b32b.png"></p> <p>从代码层面来看,整个消息队列由三个类构成:</p> <h2>消息队列类MsgQueue</h2> <p>这个类管理整个消息队列的运行,是主控程序,它包含以下方法:</p> <ul> <li> <p>init:初始化整个消息队列</p> <p>在初始化过程中,它会依次做以下事情:</p> <ol> <li>创建一个任务队列</li> <li>调用initWorkThread函数,创建指定数量的工作线程(工作线程一旦被创建,就会不停地读取任务队列中的任务)</li> <li>调用loadTask函数,从数据库中加载所有任务</li> </ol> </li> <li> <p>loadTask:加载数据库中的所有任务</p> <p>这是一个抽象函数,若要使用这个消息队列,必须实现这个函数。</p> <p>消息队列初始化的时候会调用这个函数,从数据库中加载上次没有执行完的任务。</p> <p>作为消息队列来讲,它并不知道你提供的任务是啥,因此它没办法知道你的任务应该存在哪里,以何种形式存储?因此,这个过程就需要让消息队列使用者自己去实现。</p> </li> <li> <p>saveTask:持久化当前任务队列中的任务</p> <p>这也是个抽象函数,若要使用这个消息队列,也必须实现这个函数。</p> <p>当使用者调用消息队列的stop函数时,它会被执行,用于存储当前消息队列中尚未被执行的任务,并且在下次启动消息队列的时候通过loadTask函数再次加载进任务队列,这样能确保所有任务不会被遗漏。</p> </li> <li> <p>addTask:向任务队列添加一个任务</p> </li> <li>stop:停止所有工作线程</li> <li>initWorkThread:初始化所有工作线程<br> 这是一个私有函数,当初始化整个消息队列的时候被init函数调用。</li> </ul> <h2>工作线程类WorkThread</h2> <p>工作线程会不断地检查任务队列中是否有任务,若有任务,就会取一个任务执行;若没有任务,就会等待一定时间后再次检查。</p> <p>它是MsgQueue的一个内部类。因为WorkThread的行为完全由MsgQueue管理,外界不需要知道它的存在。</p> <h2>任务类Task</h2> <p>它是一个接口,并且只有一个函数run,用于封装任务具体的执行过程。</p> <h2>附上代码</h2> <p>以下代码还没将消息队列单独抽象出来,相当于是一个专门用于IP向省市县转化的消息队列,有空把它整一下。</p> <p>代码中有详细的注释来解释线程安全性问题。</p> <ul> <li>消息队列主控程序</li> </ul> <pre> <code class="language-java">package com.sdata.foundation.web.filter; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import javax.servlet.FilterConfig; import javax.servlet.ServletContext; import org.apache.log4j.Logger; import org.springframework.web.context.support.WebApplicationContextUtils; import org.springframework.web.context.support.XmlWebApplicationContext; import com.sdata.foundation.web.service.util.DelDataUtilService; import com.sdata.foundation.web.service.util.InsertDataUtilService; import com.sdata.foundation.web.service.util.QueryDataUtilService; import com.thinkgem.jeesite.modules.sys.service.LogService; /** * 记录请求IP的消息队列 * @author Chai * */ public class RecordLocationMQ { // 工作线程的个数 private static int MaxWorkThread; // 工作线程队列 private static List<WorkThread> workThreadQueue = new ArrayList<WorkThread>(); // 任务队列(存放等待执行的任务) private static List<RecordLocationTask> msgQueue = new ArrayList<RecordLocationTask>(); // 控制所有工作线程的运行与否 private static boolean isRunning = true; private static LogService LogService; // 数据库查询的service(用于任务的持久化) private static QueryDataUtilService QueryService; // 数据库删除的service(用于任务的持久化) private static DelDataUtilService DelService; // 数据库插入的service(用于任务的持久化) private static InsertDataUtilService InsertService; // 日志 private static final Logger logger = Logger.getLogger(RecordLocationMQ.class); // 一些常量 private static final int SUCCESS = 1; private static final int FAIL = 0; /** * 本消息队列的初始化函数 * @param config 用于获取数据库操作的service */ public static void init ( FilterConfig config ) { RecordLocationMQ.init( 10, config ); } /** * 本消息队列的初始化函数 * @param MaxWorkThread 工作线程的个数 * @param config 用于获取数据库操作的service */ public static void init ( int MaxWorkThread, FilterConfig config ) { RecordLocationMQ.MaxWorkThread = MaxWorkThread; // 初始化LogService if (null == RecordLocationMQ.LogService) { ServletContext sc = config.getServletContext(); XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc); if (cxt != null && cxt.getBean("logService") != null) { RecordLocationMQ.LogService = (LogService) cxt.getBean("logService"); } } // 初始化QueryService if (null == RecordLocationMQ.QueryService) { ServletContext sc = config.getServletContext(); XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc); if (cxt != null && cxt.getBean("queryDataUtilService") != null) { RecordLocationMQ.QueryService = (QueryDataUtilService) cxt.getBean("queryDataUtilService"); } } // 初始化DelService if (null == RecordLocationMQ.DelService) { ServletContext sc = config.getServletContext(); XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc); if (cxt != null && cxt.getBean("delDataUtilService") != null) { RecordLocationMQ.DelService = (DelDataUtilService) cxt.getBean("delDataUtilService"); } } // 初始化InsertService if (null == RecordLocationMQ.InsertService) { ServletContext sc = config.getServletContext(); XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc); if (cxt != null && cxt.getBean("insertDataUtilService") != null) { RecordLocationMQ.InsertService = (InsertDataUtilService) cxt.getBean("insertDataUtilService"); } } // 从DB中加载尚未完成的任务 // PS:在新线程中执行,防止tomcat启动时间过长 new RecordLocationMQ().new loadTaskThread().start(); // 初始化工作线程,并开始工作 initWorkThread( MaxWorkThread, workThreadQueue ); } /** * 初始化工作线程,并开始工作 * @param maxWorkThread 工作线程数量 * @param workThreadQueue 工作线程队列 */ private static void initWorkThread(int maxWorkThread, List<WorkThread> workThreadQueue) { for ( int i=0; i<maxWorkThread; i++ ) { WorkThread workThread = new RecordLocationMQ().new WorkThread("WorkThread"+(i+1)); workThreadQueue.add( workThread ); workThread.start(); System.out.println("已开启线程:WorkThread"+(i+1)); } } /** * 从DB中加载尚未完成的任务 * 并插入传入的消息队列中 * @param msgQueue * @param logger * @param logService */ private static void loadTask ( List<RecordLocationTask> msgQueue, QueryDataUtilService QueryService, DelDataUtilService DelService ) { String querySQL = "select * from sys_log_temp"; String delSQL = "delete from sys_log_temp"; // 查询DB中的任务 try { List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) ); for ( Map<String,Object> map : queryResultList ) { String ip = map.get("ip").toString(); String logId = map.get("log_id").toString(); if ( null!=ip && null!=logId ) { RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) ); } } } // 查询失败,不能执行delte操作 catch (Exception e) { e.printStackTrace(); return; } // 清空DB中的任务 DelService.del( Arrays.asList( delSQL ) ); } /** * 持久化当前任务队列 */ public static void saveTask () { // PS1:为什么要使用同步? // Java不允许在遍历集合过程中新增/删除元素, // 因此在遍历任务队列前必须要先冻结任务队列, // 防止其他线程新增/删除元素; // 此外,为了冻结任务队列,就必须使用msgQueue锁。 // PS2:为何要先使isRunning为false? // 将isRunning设为false能立即停止所有工作线程(见PS3), // 从而所有工作线程都将释放msgQueue锁, // 从而确保这里的同步块能顺利拿到msgQueue锁。 // 若不执行isRunning = false的话, // 所有工作线程就会继续执行, // 如果任务队列为空,工作线程就会一直持有msgQueue锁,并等待任务的到来, // 然而添加任务的功能得在当前同步块执行完成后才会执行, // 因此就出现了死锁。 // PS3:为何将isRunning设为false就能立即停止所有工作线程? // 因为isRunning为工作线程的共享资源, // 并且工作线程的运行依赖于它的值; // 因此当isRunning设为false后, // 工作线程执行完当前任务或发现任务队列为空后,就会纷纷停止。 isRunning = false; synchronized ( msgQueue ) { for ( RecordLocationTask task : msgQueue ) { if ( task!=null && !task.persisted ) { int result = InsertService.insert( Arrays.asList( "insert into sys_log_temp (id, ip, log_id) values('"+new Date().getTime()+"', '"+task.getIp()+"', '"+task.getLogId()+"')" ) ); if ( result == SUCCESS ) { task.persisted = true; } } } isRunning = true; } } /** * 向任务队列中添加一个任务 * @param task 任务对象 */ public static void addTask ( RecordLocationTask task ) { // 添加任务 // PS:加判断的原因:由于当前这个类的这个函数是提供给别人使用的, // 我们没办法保证别人一定会传入一个非空的task, // 因此加个判断能提高程序的健壮性。 if ( task!=null ) { // PS1:加同步块的原因:由于msgQueue是ArrayList类型, // ArrayList所有函数都是线程不安全的, // 这里加一个同步块使add函数具有原子性。 // PS2:千万不能使用msgQueue作为锁! // 因为工作线程获取一个任务的过程,使用的锁就是msgQueue, // 并且在这个过程中,如果任务队列为空就会一直循环等待, // 因此在等待的过程中工作线程就一直占用的msgQueue锁; // 然而如果这里添加任务还需要msgQueue锁,那么就会出现死锁, // 工作线程因为任务队列为空就一直占用着msgQueue锁, // 而添加任务的进程获取不到msgQueue锁就无法添加任务。 synchronized ( new Object() ) { msgQueue.add(task); // System.out.println("向消息队列添加了一条task!"); } } // 持久化任务队列 // PS:不使用同步的原因:这里对于数据的实时性要求没那么高。 if ( msgQueue.size() > 100 ) { saveTask(); } } public static void stop () { isRunning = false; } /** * 工作线程内部类 */ private class WorkThread extends Thread { public WorkThread ( String threadName ) { super(threadName); } @Override public void run() { RecordLocationTask task = null; while ( isRunning ) { // 获取一个任务 synchronized ( msgQueue ) { // 任务队列为空,则等待 while ( isRunning && msgQueue.isEmpty() ) { // System.out.println("消息队列为空!"); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } // 取一个任务 // PS:加判断的原因:上述while循环的结束有两种可能: // 1.msgQueue不为空; // 2.isRunning为false // 因此要加判断排除msgQueue为空,但isRunning为false的情况, // 防止msgQueue.remove时出现空指针! if ( !msgQueue.isEmpty() ) { task = msgQueue.remove(0); } // System.out.println(this.getName() + "取了一个task!"); } // 执行任务 // PS1:加try-catch的原因:捕获任务执行过程中发生的一切异常, // 只要发生异常,就说明该任务执行失败, // 因此需要把它重新放进任务队列等待下一次执行。 // PS2:加判断的原因:同上述“取一个任务”加判断的原因一样。 try { if ( task!=null ) { task.run(); } } catch (Exception e) { // e.printStackTrace(); RecordLocationMQ.addTask( task ); // 使用addTask函数添加,统一添加的入口 } } } } /** * 从数据库加载任务的内部类 */ private class loadTaskThread extends Thread { @Override public void run() { String querySQL = "select * from sys_log_temp"; String delSQL = "delete from sys_log_temp"; // 查询DB中的任务 try { List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) ); for ( Map<String,Object> map : queryResultList ) { String ip = map.get("ip").toString(); String logId = map.get("log_id").toString(); if ( null!=ip && null!=logId ) { RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) ); } } } // 查询失败,不能执行delte操作 catch (Exception e) { e.printStackTrace(); return; } // 清空DB中的任务 DelService.del( Arrays.asList( delSQL ) ); } } // 禁用构造函数 private RecordLocationMQ () {} }</code></pre> <ul> <li>任务接口</li> </ul> <pre> <code class="language-java">package com.sdata.foundation.web.filter; public interface Task { public void run() throws Exception; }</code></pre> <ul> <li>用于IP向省市县转化的任务线程</li> </ul> <pre> <code class="language-java">package com.sdata.foundation.web.filter; import java.util.Date; import org.apache.log4j.Logger; import com.thinkgem.jeesite.common.utils.IdGen; import com.thinkgem.jeesite.modules.sys.entity.Log; import com.thinkgem.jeesite.modules.sys.service.LogService; public class RecordLocationTask implements Task { private static final Logger logger = Logger.getLogger(RecordLocationTask.class); private LogService logService; private String ip; private String logId; // public boolean persisted = false; public RecordLocationTask(String ip, String logId, LogService logService ) { super(); this.logService = logService; this.ip = ip; this.logId = logId; } @Override public void run() throws Exception { // 查询IP if ( (new Date().getTime() - TransferIPTool.lastOperaTime) < 100 ){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } String location = TransferIPTool.transferIP(ip); // 更新log Log log = new Log(); log.setIsNewRecord(false); log.setId(logId); log.setLocation(location); logService.save(log); System.out.println("完成一个task!"); } public String getIp() { return ip; } public String getLogId() { return logId; } }</code></pre> <ul> <li>用于IP向省市县转化的工具类</li> </ul> <pre> <code class="language-java">package com.sdata.foundation.web.filter; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; import com.alibaba.fastjson.JSONObject; import com.sdata.foundation.web.service.util.HTTPHelper; /** * * @author Chai * 本类用于将IP转化为位置信息 */ public class TransferIPTool { // 本次请求taobao接口的开始时间 public static long lastOperaTime = new Date().getTime(); // 模拟IP队列 private static List<String> IPList = Arrays.asList("49.65.250.135","115.28.217.42","114.80.166.240","122.92.218.0","218.28.191.23","218.12.41.179","221.239.16.227","59.108.49.35","124.117.66.0","218.21.128.31","116.52.147.50"); // IP转换接口 private static final String RequestIP = "http://ip.taobao.com/service/getIpInfo.php"; /** * 将IP转化为省份 * @param ip * @return 省份字符串 * @throws Exception */ public static String transferIP ( String ip ) throws Exception { // 记录本次请求taobao接口的开始时间 TransferIPTool.lastOperaTime = new Date().getTime(); // 打乱IPList Collections.shuffle( IPList ); String resultJsonStr = HTTPHelper.executeGet(RequestIP + "?ip=" + IPList.get(0)); JSONObject resultJsonObj = JSONObject.parseObject( resultJsonStr ); JSONObject data = resultJsonObj.getJSONObject("data"); return data.getString("region"); } }</code></pre> <p> </p> <p>来自:http://blog.csdn.net/u010425776/article/details/53837721</p> <p> </p>