RxJS 核心概念之Subject

peta28v5yli 9年前
   <p>什么是Subject?在RxJS中,Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。普通的Observable并不具备多路推送的能力(每一个Observer都有自己独立的执行环境),而Subject可以共享一个执行环境。</p>    <p>Subject是一种可以多路推送的可观察对象。与EventEmitter类似,Subject维护着自己的Observer。</p>    <p>每一个Subject都是一个Observable(可观察对象)对于一个Subject,你可以订阅( subscribe )它,Observer会和往常一样接收到数据。从Observer的视角看,它并不能区分自己的执行环境是普通Observable的单路推送还是基于Subject的多路推送。</p>    <p>Subject的内部实现中,并不会在被订阅( subscribe )后创建新的执行环境。它仅仅会把新的Observer注册在由它本身维护的Observer列表中,这和其他语言、库中的 addListener 机制类似。</p>    <p>每一个Subject也可以作为Observer(观察者)Subject同样也是一个由 next(v) , error(e) ,和 complete() 这些方法组成的对象。调用 next(theValue) 方法后,Subject会向所有已经在其上注册的Observer多路推送 theValue 。</p>    <p>下面的例子中,我们在Subject上注册了两个Observer,并且多路推送了一些数值:</p>    <pre>  <code class="language-javascript">var subject = new Rx.Subject();    subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });  subject.subscribe({    next: (v) => console.log('observerB: ' + v)  });    subject.next(1);  subject.next(2);</code></pre>    <p>控制台输出结果如下:</p>    <pre>  <code class="language-javascript">observerA: 1  observerB: 1  observerA: 2  observerB: 2</code></pre>    <p>既然Subject是一个Observer,你可以把它作为 subscribe (订阅)普通Observable时的参数,如下面例子所示:</p>    <pre>  <code class="language-javascript">var subject = new Rx.Subject();    subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });  subject.subscribe({    next: (v) => console.log('observerB: ' + v)  });    var observable = Rx.Observable.from([1, 2, 3]);    observable.subscribe(subject); // 你可以传递Subject来订阅observable</code></pre>    <p>执行后结果如下:</p>    <pre>  <code class="language-javascript">observerA: 1  observerB: 1  observerA: 2  observerB: 2  observerA: 3  observerB: 3</code></pre>    <p>通过上面的实现:我们发现可以通过Subject将普通的Observable单路推送转换为多路推送。这说明了Subject的作用——作为单路Observable转变为多路Observable的桥梁。</p>    <p>还有几种特殊的 Subject 类型,分别是 BehaviorSubject , ReplaySubject ,和 AsyncSubject 。</p>    <h2>多路推送的Observable</h2>    <p>在以后的语境中,每当提到“多路推送的Observable”,我们特指通过Subject构建的Observable执行环境。否则“普通的Observable”只是一个不会共享执行环境并且被订阅后才生效的一系列值。</p>    <p>通过使用Subject可以创建拥有相同执行环境的多路的Observable。</p>    <p>下面展示了 多路 的运作方式:Subject从普通的Observable订阅了数据,然后其他Observer又订阅了这个Subject,示例如下:</p>    <pre>  <code class="language-javascript">var source = Rx.Observable.from([1, 2, 3]);  var subject = new Rx.Subject();  var multicasted = source.multicast(subject);    // 通过`subject.subscribe({...})`订阅Subject的Observer:  multicasted.subscribe({    next: (v) => console.log('observerA: ' + v)  });  multicasted.subscribe({    next: (v) => console.log('observerB: ' + v)  });    // 让Subject从数据源订阅开始生效:  multicasted.connect();</code></pre>    <p>multicast 方法返回一个类似于Observable的可观察对象,但是在其被订阅后,它会表现Subject的特性。 multicast 返回的对象同时是 ConnectableObservable 类型的,拥有 connect() 方法。</p>    <p>connect() 方法非常的重要,它决定Observable何时开始执行。由于调用 connect() 后,Observable开始执行,因此, connect() 会返回一个 Subscription 供调用者来终止执行。</p>    <h3>引用计数</h3>    <p>通过手动调用 connect() 返回的Subscription控制执行十分繁杂。通常,我们希望在有第一个Observer订阅Subject后 <em>自动</em> connnect ,当所有Observer都取消订阅后终止这个Subject。</p>    <p>我们来分析一下下面例子中subscription的过程:</p>    <ol>     <li> <p>第一个Observer 订阅了多路推送的 Observable</p> </li>     <li> <p>多路Observable被连接</p> </li>     <li> <p>向第一个Observer发送 值为 0 的 next 通知</p> </li>     <li> <p>第二个Observer订阅了多路推送的 Observable</p> </li>     <li> <p>向第一个Observer发送 值为 1 的 next 通知</p> </li>     <li> <p>向第二个Observer发送 值为 1 的 next 通知</p> </li>     <li> <p>第一个Observer取消了对多路推送的Observable的订阅</p> </li>     <li> <p>向第二个Observer发送 值为 2 的 next 通知</p> </li>     <li> <p>第二个Observer取消了对多路推送的Observable的订阅</p> </li>     <li> <p>取消对多路推送的Observable的连接</p> </li>    </ol>    <p>通过显式地调用 connect() ,代码如下:</p>    <pre>  <code class="language-javascript">var source = Rx.Observable.interval(500);  var subject = new Rx.Subject();  var multicasted = source.multicast(subject);  var subscription1, subscription2, subscriptionConnect;    subscription1 = multicasted.subscribe({    next: (v) => console.log('observerA: ' + v)  });  subscriptionConnect = multicasted.connect();    setTimeout(() => {    subscription2 = multicasted.subscribe({      next: (v) => console.log('observerB: ' + v)    });  }, 600);    setTimeout(() => {    subscription1.unsubscribe();  }, 1200);    setTimeout(() => {    subscription2.unsubscribe();    subscriptionConnect.unsubscribe();   }, 2000);</code></pre>    <p>如果你不想显式地调用 connect() 方法,可以在ConnectableObservable类型的Observable上调用 refCount() 方法。方法会进行引用计数:记录Observable被订阅的行为。当订阅数从 0 到 1 时 refCount() 会调用 connect() 方法。到订阅数从 1 到 0 ,他会终止整个执行过程。</p>    <p>refCount 使得多路推送的Observable在被订阅后自动执行,在所有观察者取消订阅后,停止执行。</p>    <p>下面是示例:</p>    <pre>  <code class="language-javascript">var source = Rx.Observable.interval(500);  var subject = new Rx.Subject();  var refCounted = source.multicast(subject).refCount();  var subscription1, subscription2, subscriptionConnect;    console.log('observerA subscribed');  subscription1 = refCounted.subscribe({    next: (v) => console.log('observerA: ' + v)  });    setTimeout(() => {    console.log('observerB subscribed');    subscription2 = refCounted.subscribe({      next: (v) => console.log('observerB: ' + v)    });  }, 600);    setTimeout(() => {    console.log('observerA unsubscribed');    subscription1.unsubscribe();  }, 1200);    setTimeout(() => {    console.log('observerB unsubscribed');    subscription2.unsubscribe();  }, 2000);</code></pre>    <p>执行输出结果如下:</p>    <pre>  <code class="language-javascript">observerA subscribed  observerA: 0  observerB subscribed  observerA: 1  observerB: 1  observerA unsubscribed  observerB: 2  observerB unsubscribed</code></pre>    <p>只有ConnectableObservables拥有 refCount() 方法,调用后会返回一个 Observable 而不是新的ConnectableObservable。</p>    <h2>BehaviorSubject</h2>    <p>BehaviorSubject 是Subject的一个衍生类,具有“最新的值”的概念。它总是保存最近向数据消费者发送的值,当一个Observer订阅后,它会即刻从 BehaviorSubject 收到“最新的值”。</p>    <p>BehaviorSubjects非常适于表示“随时间推移的值”。举一个形象的例子,Subject表示一个人的生日,而Behavior则表示一个人的岁数。(生日只是一天,一个人的岁数会保持到下一次生日之前。)</p>    <p>下面例子中,展示了如何用 0 初始化BehaviorSubject,当Observer订阅它时, 0 是第一个被推送的值。紧接着,在第二个Observer订阅BehaviorSubject之前,它推送了 2 ,虽然订阅在推送 2 之后,但是第二个Observer仍然能接受到 2 :</p>    <pre>  <code class="language-javascript">var subject = new Rx.BehaviorSubject(0 /* 初始值 */);    subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });    subject.next(1);  subject.next(2);    subject.subscribe({    next: (v) => console.log('observerB: ' + v)  });    subject.next(3);</code></pre>    <p>输出结果如下:</p>    <pre>  <code class="language-javascript">observerA: 0  observerA: 1  observerA: 2  observerB: 2  observerA: 3  observerB: 3</code></pre>    <h2>ReplaySubject</h2>    <p>ReplaySubject 如同于 BehaviorSubject 是 Subject 的子类。通过 ReplaySubject 可以向新的订阅者推送旧数值,就像一个录像机 ReplaySubject 可以记录Observable的一部分状态(过去时间内推送的值)。</p>    <p>.一个 ReplaySubject 可以记录Observable执行过程中推送的多个值,并向新的订阅者回放它们。</p>    <p>你可以指定回放值的数量:</p>    <pre>  <code class="language-javascript">var subject = new Rx.ReplaySubject(3 /* 回放数量 */);    subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });    subject.next(1);  subject.next(2);  subject.next(3);  subject.next(4);    subject.subscribe({    next: (v) => console.log('observerB: ' + v)  });    subject.next(5);</code></pre>    <p>输出如下:</p>    <pre>  <code class="language-javascript">observerA: 1  observerA: 2  observerA: 3  observerA: 4  observerB: 2  observerB: 3  observerB: 4  observerA: 5  observerB: 5</code></pre>    <p>除了回放数量,你也可以以毫秒为单位去指定“窗口时间”,决定ReplaySubject记录多久以前Observable推送的数值。下面的例子中,我们把回放数量设置为 100 ,把窗口时间设置为 500 毫秒:</p>    <pre>  <code class="language-javascript">var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);    subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });    var i = 1;  setInterval(() => subject.next(i++), 200);    setTimeout(() => {    subject.subscribe({      next: (v) => console.log('observerB: ' + v)    });  }, 1000);</code></pre>    <p>第二个Observer接受到 3 (600ms), 4 (800ms) 和 5 (1000ms),这些值均在订阅之前的 500 毫秒内推送(窗口长度 1000ms - 600ms = 400ms < 500ms):</p>    <pre>  <code class="language-javascript">observerA: 1  observerA: 2  observerA: 3  observerA: 4  observerA: 5  observerB: 3  observerB: 4  observerB: 5  observerA: 6  observerB: 6  ...</code></pre>    <h2>AsyncSubject</h2>    <p>AsyncSubject是Subject的另外一个衍生类,Observable仅会在执行完成后,推送执行环境中的最后一个值。</p>    <pre>  <code class="language-javascript">var subject = new Rx.AsyncSubject();    subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });    subject.next(1);  subject.next(2);  subject.next(3);  subject.next(4);    subject.subscribe({    next: (v) => console.log('observerB: ' + v)  });    subject.next(5);  subject.complete();</code></pre>    <p>输出结果如下:</p>    <pre>  <code class="language-javascript">observerA: 5  observerB: 5</code></pre>    <p>AsyncSubject 与 <a href="/misc/goto?guid=4959672505903617957" rel="nofollow,noindex"> last() </a> 操作符相似,等待完成通知后推送执行过程的最后一个值。</p>    <p>来自: https://segmentfault.com/a/1190000005069851</p>