RxJS Subject 详解

iafa6945 8年前
   <h2>Observer Pattern</h2>    <h3>观察者模式定义</h3>    <p>观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。</p>    <p>我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:</p>    <ul>     <li>期刊出版方 - 负责期刊的出版和发行工作</li>     <li>订阅者 - 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知</li>    </ul>    <p>在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/6746475b66da036126669987ca4edb90.png"></p>    <h3>观察者模式结构</h3>    <p style="text-align:center"><img src="https://simg.open-open.com/show/2337383f6c5bbf02e5c1b69b844eae0c.png"></p>    <h3>观察者模式实战</h3>    <p>Subject 类定义</p>    <pre>  <code class="language-javascript">class Subject {        constructor() {          this.observerCollection = [];      }        addObserver(observer) { // 添加观察者          this.observerCollection.push(observer);      }        deleteObserver(observer) { // 移除观察者          let index = this.observerCollection.indexOf(observer);          if(index >= 0) this.observerCollection.splice(index, 1);      }        notifyObservers() { // 通知观察者          this.observerCollection.forEach((observer)=>observer.notify());      }  }</code></pre>    <p>Observer 类定义</p>    <pre>  <code class="language-javascript">class Observer {      constructor(name) {          this.name = name;      }        notify() {          console.log(`${this.name} has been notified.`);      }  }</code></pre>    <p>使用示例</p>    <pre>  <code class="language-javascript">let subject = new Subject(); // 创建主题对象    let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker'  let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo'    subject.addObserver(observer1); // 注册观察者A  subject.addObserver(observer2); // 注册观察者B    subject.notifyObservers(); // 通知观察者    subject.deleteObserver(observer1); // 移除观察者A    subject.notifyObservers(); // 验证是否成功移除</code></pre>    <p>以上代码成功运行后控制台的输出结果:</p>    <pre>  <code class="language-javascript">semlinker has been notified.  lolo has been notified.  lolo has been notified.</code></pre>    <h2>Observable subscribe</h2>    <p>在介绍RxJS - Subject 之前,我们先来看个示例:</p>    <pre>  <code class="language-javascript">const interval$ = Rx.Observable.interval(1000).take(3);    interval$.subscribe({    next: value => console.log('Observer A get value: ' + value);  });    setTimeout(() => {    interval$.subscribe({        next: value => console.log('Observer B get value: ' + value);    });  }, 1000);</code></pre>    <p>以上代码运行后,控制台的输出结果:</p>    <pre>  <code class="language-javascript">Observer A get value: 0  Observer A get value: 1  Observer B get value: 0  Observer A get value: 2  Observer B get value: 1  Observer B get value: 2</code></pre>    <p>通过以上示例,我们可以得出以下结论:</p>    <ul>     <li>Observable 对象可以被重复订阅</li>     <li>Observable 对象每次被订阅后,都会重新执行</li>    </ul>    <p>上面的示例,我们可以简单地认为两次调用普通的函数,具体参考以下代码:</p>    <pre>  <code class="language-javascript">function interval() {    setInterval(() => console.log('..'), 1000);  }    interval();    setTimeout(() => {    interval();  }, 1000);</code></pre>    <p>Observable 对象的默认行为,适用于大部分场景。但有些时候,我们会希望在第二次订阅的时候,不会从头开始接收 Observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,我们把这种处理方式成为组播 (multicast),那我们要怎么实现呢 ?回想一下我们刚才介绍过观察者模式,你脑海中是不是已经想到方案了。没错,我们可以通过自定义 Subject 来实现上述功能。</p>    <h2>自定义 Subject</h2>    <p>Subject 类定义</p>    <pre>  <code class="language-javascript">class Subject {         constructor() {          this.observers = [];      }        addObserver(observer) {           this.observers.push(observer);      }        next(value) {            this.observers.forEach(o => o.next(value));          }        error(error){           this.observers.forEach(o => o.error(error));      }        complete() {          this.observers.forEach(o => o.complete());      }  }</code></pre>    <p>使用示例</p>    <pre>  <code class="language-javascript">const interval$ = Rx.Observable.interval(1000).take(3);  let subject = new Subject();    let observerA = {      next: value => console.log('Observer A get value: ' + value),      error: error => console.log('Observer A error: ' + error),      complete: () => console.log('Observer A complete!')  };    var observerB = {      next: value => console.log('Observer B get value: ' + value),      error: error => console.log('Observer B error: ' + error),      complete: () => console.log('Observer B complete!')  };    subject.addObserver(observerA); // 添加观察者A  interval$.subscribe(subject); // 订阅interval$对象  setTimeout(() => {     subject.addObserver(observerB); // 添加观察者B  }, 1000);</code></pre>    <p>以上代码运行后,控制台的输出结果:</p>    <pre>  <code class="language-javascript">Observer A get value: 0  Observer A get value: 1  Observer B get value: 1  Observer A get value: 2  Observer B get value: 2  Observer A complete!  Observer B complete!</code></pre>    <p>通过自定义 Subject,我们实现了前面提到的功能。接下来我们进入正题 - RxJS Subject。</p>    <h2>RxJS Subject</h2>    <p>首先我们通过 RxJS Subject 来重写一下上面的示例:</p>    <pre>  <code class="language-javascript">const interval$ = Rx.Observable.interval(1000).take(3);  let subject = new Rx.Subject();    let observerA = {      next: value => console.log('Observer A get value: ' + value),      error: error => console.log('Observer A error: ' + error),      complete: () => console.log('Observer A complete!')  };    var observerB = {      next: value => console.log('Observer B get value: ' + value),      error: error => console.log('Observer B error: ' + error),      complete: () => console.log('Observer B complete!')  };    subject.subscribe(observerA); // 添加观察者A  interval$.subscribe(subject); // 订阅interval$对象  setTimeout(() => {     subject.subscribe(observerB); // 添加观察者B  }, 1000);</code></pre>    <h3>RxJS Subject 源码片段</h3>    <pre>  <code class="language-javascript">/**   * Suject继承于Observable    */  export class Subject extends Observable {      constructor() {          super();          this.observers = []; // 观察者列表          this.closed = false;          this.isStopped = false;          this.hasError = false;          this.thrownError = null;      }        next(value) {          if (this.closed) {              throw new ObjectUnsubscribedError();          }          if (!this.isStopped) {              const { observers } = this;              const len = observers.length;              const copy = observers.slice();              for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者                  copy[i].next(value);              }          }      }        error(err) {          if (this.closed) {              throw new ObjectUnsubscribedError();          }          this.hasError = true;          this.thrownError = err;          this.isStopped = true;          const { observers } = this;          const len = observers.length;          const copy = observers.slice();          for (let i = 0; i < len; i++) { // 循环调用观察者error方法              copy[i].error(err);          }          this.observers.length = 0;      }        complete() {          if (this.closed) {              throw new ObjectUnsubscribedError();          }          this.isStopped = true;          const { observers } = this;          const len = observers.length;          const copy = observers.slice();          for (let i = 0; i < len; i++) { // 循环调用观察者complete方法              copy[i].complete();          }          this.observers.length = 0; // 清空内部观察者列表      }  }</code></pre>    <p>通过 RxJS Subject 示例和源码片段,对于 Subject 我们可以得出以下结论:</p>    <ul>     <li>Subject 既是 Observable 对象,又是 Observer 对象</li>     <li>当有新消息时,Subject 会对内部的 observers 列表进行组播 (multicast)</li>    </ul>    <h2>Angular 2 RxJS Subject 应用</h2>    <p>在 Angular 2 中,我们可以利用 RxJS Subject 来实现组件通信,具体示例如下:</p>    <p>message.service.ts</p>    <pre>  <code class="language-javascript">import { Injectable } from '@angular/core';  import { Observable } from 'rxjs';  import { Subject } from 'rxjs/Subject';    @Injectable()  export class MessageService {      private subject = new Subject<any>();        sendMessage(message: string) {          this.subject.next({ text: message });      }        clearMessage() {          this.subject.next();      }        getMessage(): Observable<any> {          return this.subject.asObservable();      }  }</code></pre>    <p>home.component.ts</p>    <pre>  <code class="language-javascript">import { Component } from '@angular/core';    import { MessageService } from '../_services/index';    @Component({      moduleId: module.id,      templateUrl: 'home.component.html'  })    export class HomeComponent {      constructor(private messageService: MessageService) {}        sendMessage(): void { // 发送消息          this.messageService.sendMessage('Message from Home Component to App Component!');      }        clearMessage(): void { // 清除消息          this.messageService.clearMessage();      }  }</code></pre>    <p>app.component.ts</p>    <pre>  <code class="language-javascript">import { Component, OnDestroy } from '@angular/core';  import { Subscription } from 'rxjs/Subscription';    import { MessageService } from './_services/index';    @Component({      moduleId: module.id,      selector: 'app',      templateUrl: 'app.component.html'  })    export class AppComponent implements OnDestroy {      message: any;      subscription: Subscription;        constructor(private messageService: MessageService) {          this.subscription = this.messageService.getMessage()                .subscribe(message => { this.message = message; });      }        ngOnDestroy() {          this.subscription.unsubscribe();      }  }</code></pre>    <p>以上示例实现的功能是组件之间消息通信,即 HomeComponent 子组件,向 AppComponent 父组件发送消息。代码运行后,浏览器的显示结果如下:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/e2de1b0566e4d6e746574a76ad0800cf.png"></p>    <p><a href="/misc/goto?guid=4959746442729209148" rel="nofollow,noindex">Plunker 示例</a></p>    <h2>Subject 存在的问题</h2>    <p>因为 Subject 在订阅时,是把 observer 存放到观察者列表中,并在接收到新值的时候,遍历观察者列表并调用观察者上的 next 方法,具体如下:</p>    <pre>  <code class="language-javascript">next(value) {          if (this.closed) {              throw new ObjectUnsubscribedError();          }          if (!this.isStopped) {              const { observers } = this;              const len = observers.length;              const copy = observers.slice();              for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者                  copy[i].next(value);              }          }  }</code></pre>    <p>这样会有一个大问题,如果某个 observer 在执行时出现异常,却没进行异常处理,就会影响到其它的订阅者,具体示例如下:</p>    <pre>  <code class="language-javascript">const source = Rx.Observable.interval(1000);  const subject = new Rx.Subject();    const example = subject.map(x => {      if (x === 1) {          throw new Error('oops');      }      return x;  });  subject.subscribe(x => console.log('A', x));  example.subscribe(x => console.log('B', x));  subject.subscribe(x => console.log('C', x));    source.subscribe(subject);</code></pre>    <p>以上代码运行后,控制台的输出结果:</p>    <pre>  <code class="language-javascript">A 0  B 0  C 0  A 1  Rx.min.js:74 Uncaught Error: oops</code></pre>    <p><a href="/misc/goto?guid=4959746442821002873" rel="nofollow,noindex">JSBin - Subject Problem Demo</a></p>    <p>在代码运行前,大家会认为观察者B 会在接收到 1 值时抛出异常,观察者 A 和 C 仍会正常运行。但实际上,在当前的 RxJS 版本中若观察者 B 报错,观察者 A 和 C 也会停止运行。那么应该如何解决这个问题呢?目前最简单的方式就是为所有的观察者添加异常处理,更新后的代码如下:</p>    <pre>  <code class="language-javascript">const source = Rx.Observable.interval(1000);  const subject = new Rx.Subject();    const example = subject.map(x => {      if (x === 1) {          throw new Error('oops');      }      return x;  });    subject.subscribe(      x => console.log('A', x),      error => console.log('A Error:' + error)  );    example.subscribe(      x => console.log('B', x),      error => console.log('B Error:' + error)  );    subject.subscribe(      x => console.log('C', x),      error => console.log('C Error:' + error)  );    source.subscribe(subject);</code></pre>    <p><a href="/misc/goto?guid=4959746442903227914" rel="nofollow,noindex">JSBin - RxJS Subject Problem Solved Demo</a></p>    <h2>RxJS Subject & Observable</h2>    <p>Subject 其实是观察者模式的实现,所以当观察者订阅 Subject 对象时,Subject 对象会把订阅者添加到观察者列表中,每当有 subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next() 方法,把值一一送出。</p>    <p>Subject 之所以具有 Observable 中的所有方法,是因为 Subject 类继承了 Observable 类,在 Subject 类中有五个重要的方法:</p>    <ul>     <li>next - 每当 Subject 对象接收到新值的时候,next 方法会被调用</li>     <li>error - 运行中出现异常,error 方法会被调用</li>     <li>complete - Subject 订阅的 Observable 对象结束后,complete 方法会被调用</li>     <li>subscribe - 添加观察者</li>     <li>unsubscribe - 取消订阅 (设置终止标识符、清空观察者列表)</li>    </ul>    <h2>BehaviorSubject</h2>    <h3>BehaviorSubject 定义</h3>    <p><a href="/misc/goto?guid=4959746442984550855" rel="nofollow,noindex">BehaviorSubject 源码片段</a></p>    <pre>  <code class="language-javascript">export class BehaviorSubject extends Subject {      constructor(_value) { // 设置初始值          super();          this._value = _value;      }      get value() { // 获取当前值          return this.getValue();      }      _subscribe(subscriber) {          const subscription = super._subscribe(subscriber);          if (subscription && !subscription.closed) {              subscriber.next(this._value); // 为新的订阅者发送当前最新的值          }          return subscription;      }      getValue() {          if (this.hasError) {              throw this.thrownError;          }          else if (this.closed) {              throw new ObjectUnsubscribedError();          }          else {              return this._value;          }      }      next(value) { // 调用父类Subject的next方法,同时更新当前值          super.next(this._value = value);      }  }</code></pre>    <h3>BehaviorSubject 应用</h3>    <p>有些时候我们会希望 Subject 能保存当前的最新状态,而不是单纯的进行事件发送,也就是说每当新增一个观察者的时候,我们希望 Subject 能够立即发出当前最新的值,而不是没有任何响应。具体我们先看一下示例:</p>    <pre>  <code class="language-javascript">var subject = new Rx.Subject();    var observerA = {      next: value => console.log('Observer A get value: ' + value),      error: error => console.log('Observer A error: ' + error),      complete: () => console.log('Observer A complete!')  };    var observerB = {      next: value => console.log('Observer B get value: ' + value),      error: error => console.log('Observer B error: ' + error),      complete: () => console.log('Observer B complete!')  };    subject.subscribe(observerA);    subject.next(1);  subject.next(2);  subject.next(3);    setTimeout(() => {    subject.subscribe(observerB); // 1秒后订阅  }, 1000);</code></pre>    <p>以上代码运行后,控制台的输出结果:</p>    <pre>  <code class="language-javascript">Observer A get value: 1  Observer A get value: 2  Observer A get value: 3</code></pre>    <p>通过输出结果,我们发现在 observerB 订阅 Subject 对象后,它再也没有收到任何值了。因为 Subject 对象没有再调用 next() 方法。但很多时候我们会希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。要实现这个功能,我们就需要使用 BehaviorSubject。</p>    <p>BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用来保存当前最新的值,而不是单纯的发送事件。BehaviorSubject 会记住最近一次发送的值,并把该值作为当前值保存在内部的属性中。接下来我们来使用 BehaviorSubject 重新一下上面的示例:</p>    <pre>  <code class="language-javascript">var subject = new Rx.BehaviorSubject(0); // 设定初始值    var observerA = {      next: value => console.log('Observer A get value: ' + value),      error: error => console.log('Observer A error: ' + error),      complete: () => console.log('Observer A complete!')  };    var observerB = {      next: value => console.log('Observer B get value: ' + value),      error: error => console.log('Observer B error: ' + error),      complete: () => console.log('Observer B complete!')  };    subject.subscribe(observerA);    subject.next(1);  subject.next(2);  subject.next(3);    setTimeout(() => {    subject.subscribe(observerB); // 1秒后订阅  }, 1000);</code></pre>    <p>以上代码运行后,控制台的输出结果:</p>    <pre>  <code class="language-javascript">Observer A get value: 0  Observer A get value: 1  Observer A get value: 2  Observer A get value: 3  Observer B get value: 3</code></pre>    <p><a href="/misc/goto?guid=4959746443075004696" rel="nofollow,noindex">JSBin - BehaviorSubject</a></p>    <h2>ReplaySubject</h2>    <h3>ReplaySubject 定义</h3>    <p><a href="/misc/goto?guid=4959746443158565883" rel="nofollow,noindex">ReplaySubject 源码片段</a></p>    <pre>  <code class="language-javascript">export class ReplaySubject extends Subject {      constructor(bufferSize = Number.POSITIVE_INFINITY,                   windowTime = Number.POSITIVE_INFINITY,                   scheduler) {          super();          this.scheduler = scheduler;          this._events = []; // ReplayEvent对象列表          this._bufferSize = bufferSize < 1 ? 1 : bufferSize; // 设置缓冲区大小          this._windowTime = windowTime < 1 ? 1 : windowTime;      }        next(value) {          const now = this._getNow();          this._events.push(new ReplayEvent(now, value));          this._trimBufferThenGetEvents();          super.next(value);      }      _subscribe(subscriber) {          const _events = this._trimBufferThenGetEvents(); // 过滤ReplayEvent对象列表          let subscription;          if (this.closed) {              throw new ObjectUnsubscribedError();          }          ...          else {              this.observers.push(subscriber);              subscription = new SubjectSubscription(this, subscriber);          }            ...          const len = _events.length;          // 重新发送设定的最后bufferSize个值          for (let i = 0; i < len && !subscriber.closed; i++) {              subscriber.next(_events[i].value);          }          ...          return subscription;      }  }    class ReplayEvent {      constructor(time, value) {          this.time = time;          this.value = value;      }  }</code></pre>    <h3>ReplaySubject 应用</h3>    <p>有些时候我们希望在 Subject 新增订阅者后,能向新增的订阅者重新发送最后几个值,这时我们就可以使用 ReplaySubject ,具体示例如下:</p>    <pre>  <code class="language-javascript">var subject = new Rx.ReplaySubject(2); // 重新发送最后2个值    var observerA = {      next: value => console.log('Observer A get value: ' + value),      error: error => console.log('Observer A error: ' + error),      complete: () => console.log('Observer A complete!')  };    var observerB = {      next: value => console.log('Observer B get value: ' + value),      error: error => console.log('Observer B error: ' + error),      complete: () => console.log('Observer B complete!')  };    subject.subscribe(observerA);    subject.next(1);  subject.next(2);  subject.next(3);    setTimeout(() => {    subject.subscribe(observerB); // 1秒后订阅  }, 1000);</code></pre>    <p>以上代码运行后,控制台的输出结果:</p>    <pre>  <code class="language-javascript">Observer A get value: 1  Observer A get value: 2  Observer A get value: 3  Observer B get value: 2  Observer B get value: 3</code></pre>    <p>可能会有人认为 ReplaySubject(1) 是不是等同于 BehaviorSubject,其实它们是不一样的。在创建BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放。</p>    <p><a href="/misc/goto?guid=4959746443234889007" rel="nofollow,noindex">JSBin - ReplaySubject</a></p>    <h2>AsyncSubject</h2>    <h3>AsyncSubject 定义</h3>    <p><a href="/misc/goto?guid=4959746443324437220" rel="nofollow,noindex">AsyncSubject 源码片段</a></p>    <pre>  <code class="language-javascript">export class AsyncSubject extends Subject {      constructor() {          super(...arguments);          this.value = null;          this.hasNext = false;          this.hasCompleted = false; // 标识是否已完成      }      _subscribe(subscriber) {          if (this.hasError) {              subscriber.error(this.thrownError);              return Subscription.EMPTY;          }          else if (this.hasCompleted && this.hasNext) { // 等到完成后,才发出最后的值              subscriber.next(this.value);              subscriber.complete();              return Subscription.EMPTY;          }          return super._subscribe(subscriber);      }      next(value) {          if (!this.hasCompleted) { // 若未完成,保存当前的值              this.value = value;              this.hasNext = true;          }      }  }</code></pre>    <h3>AsyncSubject 应用</h3>    <p>AsyncSubject 类似于 last 操作符,它会在 Subject 结束后发出最后一个值,具体示例如下:</p>    <pre>  <code class="language-javascript">var subject = new Rx.AsyncSubject();      var observerA = {      next: value => console.log('Observer A get value: ' + value),      error: error => console.log('Observer A error: ' + error),      complete: () => console.log('Observer A complete!')    };      var observerB = {      next: value => console.log('Observer B get value: ' + value),      error: error => console.log('Observer B error: ' + error),      complete: () => console.log('Observer B complete!')    };      subject.subscribe(observerA);      subject.next(1);    subject.next(2);    subject.next(3);      subject.complete();      setTimeout(() => {      subject.subscribe(observerB); // 1秒后订阅    }, 1000);</code></pre>    <p>以上代码运行后,控制台的输出结果:</p>    <pre>  <code class="language-javascript">Observer A get value: 3  Observer A complete!  Observer B get value: 3  Observer B complete!</code></pre>    <p><a href="/misc/goto?guid=4959746443404100362" rel="nofollow,noindex">JSBin - AsyncSubject</a></p>    <h2>参考资源</h2>    <ul>     <li><a href="/misc/goto?guid=4959746443490382158" rel="nofollow,noindex">Understanding Subjects in RxJS</a></li>     <li><a href="/misc/goto?guid=4959746443576918746" rel="nofollow,noindex">30 天精通 RxJS (22) - 什么是 Subject</a></li>     <li><a href="/misc/goto?guid=4959746443655383975" rel="nofollow,noindex">Communicating Between Components with Observable & Subject</a></li>    </ul>    <p> </p>    <p>来自:https://juejin.im/post/58dca39861ff4b006b03b80c</p>    <p> </p>