Spring框架4.0 M2 中的 WebSocket 消息架构
正如以前我所写的那样,WebSocket API只是WebSocket形式消息应用的起点。许多实际的挑战仍然存在。这也正是一个Tomcat邮件列表用户最近苦思冥想的:
确实对我来说Websocket仍然不是一个真正的“已经准备好的产品”,(我不讨论Tomcat本身的实现,只是更一般性地讨论)...IE里的本地WebSocket功能只是自IE-10开始才可用,而且允许低版本的IE运行WebSocket的方案是有些“不确定的”(例如转到取决于Adobe的FlashPlayer而不是取决于IE本身)。(我们的大多数客户都是相当大的公司,它们不打算更新浏览器,也不在防火墙上开放特殊的端口,只是请我们去做)。
Spring框架4.0的第一个发布版提供了SockJS服务器端的支持,以及最好的和最多的综合的WebSocket浏览器后备选项。在那些不支持WebSocket的浏览器里,我们需要这些后备选项。这种情况下网络代理将阻止使用WebSocket。今天,简单地把SockJS放置在浏览器里就能让你创建WebSocket应用,而且在必要的时候由未被察觉的后备选项决定该怎么做。
即使有后备选项,仍有巨大的挑战。一个socket是一个层次非常低的抽象,今天绝大多数web应用并不是针对socket编程的。这就是为什么WebSocket要定义一个子协议机制,从本质上启用,促进基于WebSocket的高等级协议的使用,就像我们基于TCP使用的HTTP。
这个Spring Framework 4.0的第二个里程碑,使得使用基于WebSocket的高等级消息协议成为可能。为了演示这一点,我们汇集出了一个示例程序。
股票投资组合案例(Stock Portfolio Sample)
股票组合案例应用, 可自Github获得, 加载一个用户的投资组合头寸,允许购买和出售股票,消费价格行情,以及显示位置更新。这是一个相当简单的应用。但是它解决了基于浏览器的消息应用可能会遇到的 许多一般性任务。
那么我们怎样建立起一个这样的应用呢?从使用HTTP和REST以后,我们就已习惯于基于URL和HTTP动作来表达需要做的事情。这里我们有一个socket和许多的消息。你又怎样去告诉谁一个消息,并说明这个消息的意思呢?
浏览器与服务器必须在语义表达以前,对公共的消息格式达成一致。有几个已存在的协议可以帮助做到这一点。我们为这个里程碑选择了STOMP,这归因于它的简单与广泛的支持。
面向简单的/流形式的文本消息协议(STOMP)
STOMP是为了简单而创建的一种消息协议。它基于模仿HTTP协议的帧。帧由一个命令、可选的头和可选的体组成。例如股票投资应用需要接收股票报价,因此客户端发送SUBSCRIBE帧,这帧头中的目的表明客户端打算订阅什么:
SUBSCRIBE id:sub-1 destination:/topic/price.stock.*
当股票报价有效时,服务器发送含有匹配目的和订阅id以及内容类型头和内容体的MESSAGE帧:
MESSAGE subscription:sub-1 message-id:wm2si1tj-4 content-type: application/json destination:/topic/stocks.PRICE.STOCK.NASDAQ.EMC {\"ticker\":\"EMC\",\"price\":24.19}为了在浏览器里实现这些,我们使用了 stomp.js 和 SockJS客户端:
varsocket =newSockJS('/spring-websocket-portfolio/portfolio'); varclient = Stomp.over(socket); varonConnect =function() { client.subscribe("/topic/price.stock.*",function(message) { // process quote }); }; client.connect('guest','guest', onConnect);这已经获得巨大收获!我们拥有了标准的消息格式和客户端支持。
现在我们把其中一个移动到服务器端。
消息代理方案
服务器端的一个选项是纯消息代理方案,这时消息可直接发送到传统的消息代理如RabbitMQ,ActiveMQ等。即便不是所有的消息代理都支持,大多数都支持TCP上的STOMP,不过它们也逐渐支持WebSocket上的STOMP,而随着RabbitMQ进一步发展,它也支持SockJS。我们的架构看起来如下:
这是一个可靠且可伸缩的的方案,然而可能不是最适合手边的这种问题。消息代理通常在企业内部使用。直接把它们暴露在互联网上不是理想的选择。
如果我们已经从REST里学习到了什么,那么它就是我们不打算暴露如数据库或者域模型这样的我们系统内部的细节。
另外,做为一名Java开发人员,你想应用安全、有效性以及添加应用逻辑。在消息代理方案里,应用服务器位于消息代理之后,这很大程度上违背了大多数互联网应用开发人员习惯的用法。
这就是像socket.io这样的库流行的原因。它简单且满足了开发互联网应用的需求。另一方面,我们必须不能忽视消息代理处理消息的能力,它们真正的擅长是处理消息,而消息路由是个难题。我们需要两者都最佳。
应用和消息代理方案
另一个方案是让应用处理进来的消息且做为互联网客户端和消息代理之间的中间人。来自客户端的消息通过应用流向代理,相反来自代理的消息通过应用返回给客户端。这给应用提供了检查进入的消息类型和“目的”头的机会,以确定是处理消息呢,还是传递消息给代理。
这就是我们选择的方案。为了更好的说明这个方案,下面有一些应用场景。
装载投资组合应用的状态
- 客户端请求投资组合应用的状态
- 应用装载并给订阅者返回数据的方式处理这个请求
- 这种交互没有涉及到消息代理
订阅股票报价
- 客户端发送股票报价的订阅请求
- 应用把这条消息传递给消息代理
- 消息代理传送消息给所有已经订阅的所有客户端
接收股票报价
- 报价服务发送股票报价信息给消息代理
- 消息代里传送消息给所有订阅的用户
执行交易
- 客户端发送交易请求
- 应用处理这个请求,通过交易服务提交所要执行的交易
- 这种交互里没有涉及到消息代理
接收位置更新
- 交易服务发送状态更新消息给消息代理上的队列
- 消息代理发送状态更新信息给客户端
- 更多发送消息给特定的客户的细节在下面
代码片段
让我们看看客户端和服务器端代码的一些例子。下面是请求投资组合应用状态的 portfolio.js:
stompClient.subscribe("/app/positions",function(message) { self.portfolio().loadPositions(JSON.parse(message.body)); });在服务器端,PortfolioController检测请求,然后返回投资组合应用的状态,这解释了互联网应用里非常普通的请求-应答交互。由于我们使用Spring Security来保护HTTP请求,它包括产生WebSocket握手的请求。下面的principal方法参数是从用户HttpServeletRequest的principal Spring Security集里提取出来的。
@Controller publicclassPortfolioController { // ... @SubscribeEvent("/app/positions") publicList<PortfolioPosition> getPortfolios(Principal principal) { String user = principal.getName(); Portfolio portfolio =this.portfolioService.findPortfolio(user); returnportfolio.getPositions(); } }下面发送交易请求的protfolio.js:
stompClient.send("/app/trade", {}, JSON.stringify(trade));在服务器端,PortfolioController发送执行的交易:
@Controller publicclassPortfolioController { // ... @MessageMapping(value="/app/trade") publicvoidexecuteTrade(Trade trade, Principal principal) { trade.setUsername(principal.getName()); this.tradeService.executeTrade(trade); } }PortfolioController还可以处理不期望的例外,并发送消息给用户。
@Controller publicclassPortfolioController { // ... @MessageExceptionHandler @ReplyToUser(value="/queue/errors") publicString handleException(Throwable exception) { returnexception.getMessage(); } }从应用内部发送消息给订阅的用户意味着什么呢?下面是报价服务如何发送报价的:
@Service publicclassQuoteService { privatefinalMessageSendingOperations<String> messagingTemplate; @Scheduled(fixedDelay=1000) publicvoidsendQuotes() { for(Quote quote :this.quoteGenerator.generateQuotes()) { String destination ="/topic/price.stock."+ quote.getTicker(); this.messagingTemplate.convertAndSend(destination, quote); } } }而下面是交易服务器在交易执行完成后是如何发送状态更新的:
@Service publicclassTradeService { // ... @Scheduled(fixedDelay=1500) publicvoidsendTradeNotifications() { for(TradeResult tr :this.tradeResults) { String queue ="/queue/position-updates"; this.messagingTemplate.convertAndSendToUser(tr.user, queue, tr.position); } } }而且以防你疑惑....不要疑惑,根据以前构建在线游戏应用的开发者在文档里所提的建议PortfolioController还可以包含Spring的MVC方法(例如@RequestMapping):
是的,把[消息]映射和Spring的MVC映射统一是很好的。没有理由不统一 它们。就像报价服务和交易服务一样,Spring的MVC控制器方法也可以发布消息。
Spring应用对消息处理的支持
Spring Integration已经为众所周知的企业集成模式和轻量级消息处理提供一流的抽象很长时间了。当我们在Spring Integration上面工作的时候,我们认识到后者才真正是我们需要构建的基础。
因此,我很高兴地宣布我们已经把精选出来的整合到Spring框架的Spring Integration原型转换为一个新的提前称为spring消息处理的模块。除了像Message,MessageChannel,MessageHandler以及其他核心抽象外,新的模块还包括所有支持这篇文章中所阐释的新特性的注释脚本和类。
有了这种思想,现在我们就可以看看股票投资组合应用的内部架构图:
StompWebSocketHandler把进入的客户端信息放到“分发”消息通道上。这个通道有3个订阅者。第一个把消息委派给注释方法,第二个转发消息给STOMP消息代理,而第三个通过转换目的为唯一的客户端订阅的队列名而处理发送给各个用户的消息(更详细的细节见下面)。
默认情况下应用是以一个“简单的“提供初始化选项的消息代理运行的。正如样例的README文件所说明的那样,你可以通过激活或者不激活配置文件来在在“简单的”和全功能的消息代理之间进行切换。
另一个可能更改的配置是从Executor切换到MessageChannel的消息传递的基于Reactor的实现。最近发布了第一个版本的Reactor项目还可以用来管理应用和消息代理之间的TCP连接。
你可以看到包括新Spring Security Java配置的全功能的应用配置。你也可能有兴趣Java配置里对改进的STS的支持。
发送消息给单个用户
明白如何广播消息给多个已订阅的客户是很容易的,只要发布消息给某个话题组。明白如何发送消息给特定的用户却是非常难。例如你可能捕捉到一个例外而且想发送错误消息。或者你可能接收到交易确认消息并想把这条消息发送给用户。
在传统的消息处理应用里,创建临时队列并在期望应答的消息上设置“rely-to"头是很常见的。然而在互联网应用里这个工作却相当难以处理。客户端必须记住在所有应用的消息上设置必须的消息头,而服务器应用可能需要跟踪并传递这个消息头。有时这样的信息可能完全不是那么容易用,比如,当把HTTP POST当作传递消息的另一个选项的时候。
为了支持这种需求,我们发送一个唯一的队列后缀给每个已经连接的客户端。然后添加这个后缀以创建唯一的队列名。client.connect('guest','guest',function(frame) { varsuffix = frame.headers['queue-suffix']; client.subscribe("/queue/error"+ suffix,function(msg) { // 处理错误 }); client.subscribe("/queue/position-updates"+ suffix,function(msg) { // 处理状态更新 }); });然后在服务器端,给@messageExceptionHandler方法(或者任何消息处理方法)增加一个@ReplyToUser注释脚本,用来以消息的形式发送返回值。
@MessageExceptionHandler @ReplyToUser(value="/queue/errors") publicString handleException(Throwable exception) { // ... }像交易服务(TradeService)这样的所有其他类都可以使用消息模板获得同样的功能。
String user ="fabrice"; String queue ="/queue/position-updates"; this.messagingTemplate.convertAndSendToUser(user, queue, position);在以上两种情况下,为了重新构造正确的队列名,我们内部(通过配置 用户队列后缀解析器)定位用户的队列后缀。目前只有一个简单的解释器实现了这个功能。然而不管用户是否连接到这个或者其他应用服务器,增加支持同样功能的 Redis实现是很简单的。
结论
但愿这篇文章是对新功能的有用的介绍。为了不使这篇文章过长,我鼓励你去看看这些例子,思考一下它对你正在写或者即将写的应用意味着什么。由于9月早期我将发布一个新的版本,因此现在是反馈的最佳时间。
SpringOne 2GX 2013即将来临
请你尽快订购在圣克拉拉召开的SpringOne大会的位子。它确实是获得正在开发的所有一切的一手资料和直接提供反馈的绝佳机会。希望今年有许多重要的新公告发布。看看最新的博客文档是否正如我所说,接下来还会有更多的文章发上来!