





  • 系统中存在一个消息服务(Message Service),即总线
  • 监听器对象,通过实现一个可被通知的对象的接口,将自己注册在消息服务上
  • 可被通知的对象可以向消息总线上post消息,就这个对象而言,它对其他注册在总线上的对象是一无所知的
  • 消息服务进行消息的调度和转发,将消息(事件)发送给指定的对象,从而传递这个异步事件





public class Test{   public static void main(String[] args) throws RemoteException{    /*     * 创建一个可被通知的对象(监听器), 这个监听器关注这样几个事件     * TIMEOUT, CLOSE, and READY     */    Configuration config = new RMIServerConfiguration(null, 0);    CommonNotifiableEntry entry1 =      new CommonNotifiableEntry(config, "client1",       MessageTypes.MESSAGE_TIMEOUT |       MessageTypes.MESSAGE_CLOSE |       MessageTypes.MESSAGE_READY);        /*     * 创建另一个监听器, 这个监听器关注这样几个事件     * OPEN, CLOSE, and TIMEOUT.     */    CommonNotifiableEntry entry2 =      new CommonNotifiableEntry(config, "client2",       MessageTypes.MESSAGE_OPEN |       MessageTypes.MESSAGE_CLOSE |       MessageTypes.MESSAGE_TIMEOUT);        // 将监听器挂在BUS上    entry1.register();    entry2.register();        // 创建一个新的消息, MESSAGE_OPEN类型.    Message msg = new CommonMessage(      entry1.getId(),      entry2.getId(),      MessageTypes.MESSAGE_OPEN,      "busying now");        // 传递给entry2    entry1.post(msg);        // 创建一个MESSAGE_CLICKED类型的消息, entry2    // 不关注这个类型的消息,所以此消息不会被传递    Message msgCannotBeReceived = new CommonMessage(      entry1.getId(),      entry2.getId(),      MessageTypes.MESSAGE_CLICKED,      "cliked evnet");    entry1.post(msgCannotBeReceived);        try {     Thread.sleep(2000);    } catch (InterruptedException e) {     e.printStackTrace();    }        // re use the message object to send another message entry    msg.setSource(entry2.getId());    msg.setTarget(entry1.getId());    msg.setType(MessageTypes.MESSAGE_READY);    msg.setBody("okay now");    entry2.post(msg);        // 卸载这些监听器,当程序退出,或者    // 或者监听器不在关注事件发生的时候    entry1.unregister();    entry2.unregister();   }  }





/**   *    * @author Abruzzi   *   */  public class MessageBus extends UnicastRemoteObject implements Bus{   private static MessageBus instance;   private List<NotifiableEntry> listeners;   private List<Message> messages;   private Thread daemonThread = null;      public static MessageBus getInstance() throws RemoteException{    if(instance == null){     instance = new MessageBus();    }    return instance;   }      private MessageBus() throws RemoteException{    listeners = new LinkedList<NotifiableEntry>();    messages = new LinkedList<Message>();    Daemon daemon = new Daemon();    daemonThread = new Thread(daemon);          daemonThread.setPriority(Thread.NORM_PRIORITY + 3);          daemonThread.setDaemon(true);          daemonThread.start();                    while(!daemonThread.isAlive());   }      /**    * mount notifiable object to listener list    */   public void mount(NotifiableEntry entry) throws RemoteException{    synchronized(listeners){     listeners.add(entry);     listeners.notifyAll();    }   }     /**    * unmount the special notifiable object from listener    */   public void unmount(NotifiableEntry entry) throws RemoteException{    synchronized(listeners){     listeners.remove(entry);     listeners.notifyAll();    }   }      /**    * post a new message into the bus    * @param message    */   public void post(Message message) throws RemoteException{    synchronized(messages){     messages.add(message);     messages.notifyAll();    }   }      /**    *     * @author Abruzzi    * worker thread, dispatch message to appropriate listener    *    */   private class Daemon implements Runnable{    private boolean loop = true;    public void run(){     while(loop){      if(messages.size() == 0){       synchronized(messages){        try {messages.wait();}         catch (InterruptedException e) {         e.printStackTrace();        }       }      }      processIncomingMessage();     }    }   }      /**    * process the incoming message, remove the first message from    * queue, and then check all listeners to see whether should     * deliver the message to or not.    */   private void processIncomingMessage(){    Message msg;    synchronized(messages){     msg = messages.remove(0);    }    String target = null;    int type = 0;    int mask = 0;    try {     target = msg.getTarget();     type = msg.getType();     if(target == MessageTypes.SENDTOALL){      for(NotifiableEntry entry : listeners){       mask = entry.getSense();       if((mask & type) == type){entry.update(msg);}      }     }else{      for(NotifiableEntry entry : listeners){       mask = entry.getSense();       if(entry.getId().equals(target) && (mask & type) == type){        entry.update(msg);       }      }     }    } catch (RemoteException e) {     e.printStackTrace();    }   }    }

消息总线是一个RMI对象,其中mount(), unmout(), post()等方法可以被远程调用。MessageBus维护两个列表,一个消息列表,一个监听器列表。当消息被post到总线上后,post会立即返回,然后工作线程启动,取出消息并将其分发到合适的监听器上。


P.S.我将这个项目托管在google code上了,叫BBMS(Bus Based Message Service),感兴趣的可以去看看:http://code.google.com/p/bbms/