基于总线的消息服务(BBMS)的设计与实现

14年前

前言
异步事件的通知机制在比较有规模的软件设计中必然会有涉及,比如GUI程序中的事件监听器,应用程序模块之间的通信,以及分布式应用中的消息机制等。如果使用语言原生的对象注册通信,则耦合度不可避免的会增大,也就是说,当时间发生时,A要通知B,则A必须知道B的存在。耦合度的增大在一定程度上必然会影响灵活性。所以,另一种模式就是今天要说的总线模式(BUS Based),即所有的监听器将自己挂在总线上,这些监听器互相之间是无法直接通信的,它们可以向总线上push消息,或者从总线上得到消息,从而实现相互间的通信,当然,这种模式会在性能上有一定的额外开销。

BBMS的主页在google code上:http://code.google.com/p/bbms/

总线机制


bbms的客户端程序通过将自己注册在BUS Server上来等待异步事件,这个过程可以是本地的,也可以是远程的。本地的BUS可以作为GUI框架中的事件分发者 (dispatcher).JMS(Java Message Service)提供企业级的软件模块之间的通信机制,可以使得多个不同的应用集成为一个大型的应用。通过使用BBMS的远程接口,同样可以达到这样的效果。

BBMS的API

  1. /**  
  2.  *   
  3.  * @author juntao.qiu  
  4.  *  
  5.  */  
  6. public class Test{   
  7.     public static void main(String[] args) throws RemoteException{   
  8.         /*  
  9.          * create a notifiable entry, declare that it's care of  
  10.          * TIMEOUT, CLOSE, and READY event.  
  11.          */  
  12.         Configuration config = new RMIServerConfiguration(null0);   
  13.         CommonNotifiableEntry entry1 =    
  14.             new CommonNotifiableEntry(config, "client1",    
  15.                 MessageTypes.MESSAGE_TIMEOUT |    
  16.                 MessageTypes.MESSAGE_CLOSE |    
  17.                 MessageTypes.MESSAGE_READY);   
  18.            
  19.         /*  
  20.          * create another notifiable entry, declare that it's care of  
  21.          * OPEN, CLOSE, and TIMEOUT event.  
  22.          */  
  23.         CommonNotifiableEntry entry2 =    
  24.             new CommonNotifiableEntry(config, "client2",    
  25.                 MessageTypes.MESSAGE_OPEN |    
  26.                 MessageTypes.MESSAGE_CLOSE |    
  27.                 MessageTypes.MESSAGE_TIMEOUT);   
  28.            
  29.         // register them to the remote Message BUS to listener events   
  30.         entry1.register();   
  31.         entry2.register();   
  32.            
  33.         // new a message, of type MESSAGE_OPEN.   
  34.         Message msg = new CommonMessage(   
  35.                 entry1.getId(),   
  36.                 entry2.getId(),   
  37.                 MessageTypes.MESSAGE_OPEN,   
  38.                 "busying now");   
  39.            
  40.         // deliver it to entry2, which is from entry1   
  41.         entry1.post(msg);   
  42.            
  43.         // create a message, of type MESSAGE_CLICKED, the entry2   
  44.         // does not handle this type, it'll not be deliver to entry2   
  45.         Message msgCannotBeReceived = new CommonMessage(   
  46.                 entry1.getId(),   
  47.                 entry2.getId(),   
  48.                 MessageTypes.MESSAGE_CLICKED,   
  49.                 "cliked evnet");   
  50.         entry1.post(msgCannotBeReceived);   
  51.            
  52.         try {   
  53.             Thread.sleep(2000);   
  54.         } catch (InterruptedException e) {   
  55.             e.printStackTrace();   
  56.         }   
  57.            
  58.         // re use the message object to send another message entry   
  59.         msg.setSource(entry2.getId());   
  60.         msg.setTarget(entry1.getId());   
  61.         msg.setType(MessageTypes.MESSAGE_READY);   
  62.         msg.setBody("okay now");   
  63.         entry2.post(msg);   
  64.            
  65.         // unregister self when all works are done or    
  66.         // don't want to listen any more   
  67.         entry1.unregister();   
  68.         entry2.unregister();   
  69.     }   
  70. }   

API的设计,最好可以做到简单,易用。BBMS也尽力要做到这一点,每一个notifiable(可别通知的)的对象,可以将自己注册到BUS上,当消息抵达时,BUS管理器会调用这个对象上的update方法,进行通知。

  1. This is client2, get message from : client1, it said that : busying now   
  2. This is client1, get message from : client2, it said that : okay now   


这个是MS运行的一个简单流程图。

BUS的实现
BUS接口的定义,可以向BUS上注册一个notifiableEntry(可被通知的对象),或者卸载这个对象,同时,可以向BUS中post一条消息。

  1. package bbms.framework;   
  2.   
  3. /**  
  4.  * @author juntao.qiu  
  5.  */  
  6. public interface Bus extends java.rmi.Remote{   
  7.     /**  
  8.      * mount an notifiable entry on bus  
  9.      * @param entry  
  10.      */  
  11.     public void mount(NotifiableEntry entry) throws java.rmi.RemoteException;   
  12.        
  13.     /**  
  14.      * unmount the notifiable entry on bus  
  15.      * @param entry  
  16.      */  
  17.     public void unmount(NotifiableEntry entry) throws java.rmi.RemoteException;   
  18.        
  19.     /**  
  20.      * post a new message to Message Bus  
  21.      * @param message  
  22.      */  
  23.     public void post(Message message) throws java.rmi.RemoteException;   
  24. }   
  25.   

BUS的实现比较有意思,其中维护两个链表,一个是监听器链表,一个是消息链表,挂载在总线上的实体向BUS发送一条消息,这个过程会立即返回。因为发送消息的过程可能由于网络原因或其他原因而延迟,而消息的发送者没有必要等待消息的传递,所以BUS中有一个主动线程,这个线程在BUS中放入新的消息时被唤醒,并对监听器链表进行遍历,将消息分发出去。由于BUS是一个服务级的程序,所以这个主动线程被设计成为一个daemon线程,除非显式的退出或者出错,否则BUS将会一直运行。

  1.     /**  
  2.      *   
  3.      * @author juntao.qiu  
  4.      * worker thread, dispatch message to appropriate listener  
  5.      *  
  6.      */  
  7.     private class Daemon implements Runnable{   
  8.         private boolean loop = true;   
  9.         public void run(){   
  10.             while(loop){   
  11.                 if(messages.size() == 0){   
  12.                     synchronized(messages){   
  13.                         try {messages.wait();}    
  14.                         catch (InterruptedException e) {   
  15.                             e.printStackTrace();   
  16.                         }   
  17.                     }   
  18.                 }   
  19.                 processIncomingMessage();   
  20.             }   
  21.         }   
  22.     }   
  23. BUS中的内部工作者线程。它被作为一个Daemon线程:   
  24.     private MessageBus() throws RemoteException{   
  25.         listeners = new LinkedList<NotifiableEntry>();   
  26.         messages = new LinkedList<Message>();   
  27.         Daemon daemon = new Daemon();   
  28.         daemonThread = new Thread(daemon);   
  29.         daemonThread.setPriority(Thread.NORM_PRIORITY + 3);   
  30.         daemonThread.setDaemon(true);   
  31.         daemonThread.start();   
  32.            
  33.         while(!daemonThread.isAlive());   
  34.     }   
  35. 消息的定义   
  36. public interface Message{   
  37.     public int getType();   
  38.     public void setType(int type);   
  39.        
  40.     public String getTarget();   
  41.     public void setTarget(String target);   
  42.        
  43.     public String getSource();   
  44.     public void setSource(String source);   
  45.        
  46.     public Object getBody();   
  47.     public void setBody(Object body);   
  48. }   

为了更通用起见,消息体部分可以包含任何对象。消息类型参考了windows的消息机制,可以将消息进行复合:

  1. /*   
  2.  * 0x8000 = 1000 0000 0000 0000  
  3.  * 0x4000 = 0100 0000 0000 0000  
  4.  * 0x2000 = 0010 0000 0000 0000  
  5.  * 0x1000 = 0001 0000 0000 0000  
  6.  *    
  7.  * it's very useful when you want to combine some messages   
  8.  * together, and the user can simply determine what exactly   
  9.  * what you want. Refer the implementation of MessageBus.java   
  10.  * for more details.   
  11.  */   
  12. public static final int MESSAGE_TIMEOUT = 0x8000;   
  13. public static final int MESSAGE_CLICKED = 0x4000;   
  14. public static final int MESSAGE_CLOSE = 0x2000;   
  15. public static final int MESSAGE_OPEN = 0x1000;   
  16.   
  17. public static final int MESSAGE_READY = 0x0800;   
  18. public static final int MESSAGE_BUSY = 0x0400;   
  19. public static final int MESSAGE_WAIT = 0x0200;   
  20. public static final int MESSAGE_OKAY = 0x0100;   

总结
BBMS如果进行适当的扩展,可以完全实现JMS规范中涉及到的所有主题,如订阅模式(BBMS现在的实现中只有PTP模式,及点对点的模式,发送消息和接受消息的实体都必须同时在线)。BBMS主要面向的是轻量级的消息传递,比如GUI,分布式的GUI等。如果有兴趣,可以到BBMS 的页面上看一看:http://code.google.com/p/bbms/

来源:http://www.cnblogs.com/abruzzi/archive/2009/07/25/1531068.html,作者:abruzzi