RxJS: 理解 publish 和 share 操作符

xiaojia 7年前
   <h3><sub>我经常会被问及 <code>publish</code> 操作符的相关问题:</sub></h3>    <h3><sub>publish 和 share 之间的区别是什么?<br> 如何导入 refCount 操作符?<br> 何时使用 AsyncSubject?</sub></h3>    <p>我们来解答这些问题,并让你了解到更多内容,首先从基础入手。</p>    <h2>多播的心智模型</h2>    <p>多播是一个术语,它用来描述由单个 observable 发出的每个通知会被多个观察者所接收的情况。一个 observable 是否具备多播的能力取决于它是热的还是冷的。</p>    <p>热的和冷的 observable 的特征在于 observable 通知的生产者是在哪创建的。在 Ben Lesh 的 <a href="/misc/goto?guid=4959756663129493503">热的 Vs 冷的 Observables</a> 一文中,他详细讨论了两者间的差异,这些差异可以归纳如下:</p>    <ul>     <li>如果通知的生产者是观察者订阅 observable 时创建的,那么 observable 就是冷的。例如,<code><a href="/misc/goto?guid=4959756663214689103" rel="nofollow noreferrer">timer</a></code> observable 就是冷的,每次订阅时都会创建一个新的定时器。</li>     <li>如果通知的生产者不少每次观察者订阅 observable 时创建的,那么 observable 就是热的。例如,使用 <code><a href="/misc/goto?guid=4959756663295516026" rel="nofollow noreferrer">fromEvent</a></code> 创建的 observable 就是热的,产生事件的元素存在于 DOM 之中,它不是观察者订阅时所创建的。</li>    </ul>    <p>冷的 observables 是单播的,每个观察者所接收到的通知都是来自不同的生产者,生产者是观察者订阅时所创建的。</p>    <p>热的 observables 是多播的,每个观察者所接收到的通知都是来自同一个生产者。</p>    <p>有些时候,需要冷的 observable 具有多播的行为,RxJS 引入了 <code>Subject</code> 类使之成为可能。</p>    <p>Subject 即是 observable,又是 observer (观察者)。通过使用观察者来订阅 subject,然后 subject 再订阅冷的 observable,可以让冷的 observable 变成热的。这是 RxJS 引入 subjects 的主要用途,在 Ben Lesh 的 <a href="/misc/goto?guid=4959756663383497312">关于 RxJS 中的 Subject</a> 一文中,他指出:</p>    <blockquote>    <strong>多播是 RxJS 中 Subjects 的主要用法。</strong>   </blockquote>    <p>我们来看下面的示例:</p>    <pre>  <code class="language-javascript">import { Observable } from "rxjs/Observable";  import { Subject } from "rxjs/Subject";  import "rxjs/add/observable/defer";  import "rxjs/add/observable/of";    const source = Observable.defer(() => Observable.of(    Math.floor(Math.random() * 100)  ));    function observer(name: string) {    return {      next: (value: number) => console.log(`observer ${name}: ${value}`),      complete: () => console.log(`observer ${name}: complete`)    };  }    const subject = new Subject<number>();  subject.subscribe(observer("a"));  subject.subscribe(observer("b"));  source.subscribe(subject);  </code></pre>    <p>示例中的 <code>source</code> 是冷的。每次观察者订阅 <code>source</code> 时,传给 <code><a href="/misc/goto?guid=4959756663463308342" rel="nofollow noreferrer">defer</a></code> 的工厂函数会创建一个发出随机数后完成的 observable 。</p>    <p>要让 <code>source</code> 变成多播的,需要观察者订阅 subject,然后 subject 再订阅 <code>source</code>。<code>source</code> 只会看到一个订阅 ( subscription ),它也只生成一个包含随机数的 <code>next</code> 通知和一个 <code>complete</code> 通知。Subject 会将这些通知发送给它的观察者,输出如下所示:</p>    <pre>  <code class="language-javascript">observer a: 42  observer b: 42  observer a: complete  observer b: complete  </code></pre>    <p>此示例可以作为 RxJS 多播的基本心智模型: 一个源 observable,一个订阅源 observable 的 subject 和多个订阅 subject 的观察者。</p>    <p> </p>    <h2>multicast 操作符和 ConnectableObservable</h2>    <p>RxJS 引入了 <code><a href="/misc/goto?guid=4959756663550599047" rel="nofollow noreferrer">multicast</a></code> 操作符,它可以应用于 observable ,使其变成热的。此操作符封装了 subject 用于多播 observable 时所涉及的基础结构。</p>    <p>在看 <code>multicast</code> 操作符之前,我们使用一个简单实现的 <code>multicast</code> 函数来替代上面示例中的 subject :</p>    <pre>  <code class="language-javascript">function multicast<T>(source: Observable<T>) {    const subject = new Subject<T>();    source.subscribe(subject);    return subject;  }    const m = multicast(source);  m.subscribe(observer("a"));  m.subscribe(observer("b"));  </code></pre>    <p>代码改变后,示例的输出如下:</p>    <pre>  <code class="language-javascript">observer a: complete  observer b: complete  </code></pre>    <p>这并不是我们想要的结果。在函数内部订阅 subject 使得 subject 在被观察者订阅之前就已经收到了 <code>next</code> 和 <code>complete</code> 通知,所以观察者只能收到 <code>complete</code> 通知。</p>    <p>这是可避免的,任何连接多播基础结构的函数的调用者需要能够在 subject 订阅源 observable 时进行控制。RxJS 的 <code>multicast</code> 操作符通过返回一个特殊的 observable 类型 <a href="/misc/goto?guid=4959756663637459325" rel="nofollow noreferrer">ConnectableObservable</a> 来实现的。</p>    <p>ConnectableObservable 封装了多播的基础结构,但它不会立即订阅源 observable ,只有当它的 <code>connect</code> 方法调用时,它才会订阅源 observable 。</p>    <p>我们来使用 <code>multicast</code> 操作符:</p>    <pre>  <code class="language-javascript">import { Observable } from "rxjs/Observable";  import { Subject } from "rxjs/Subject";  import "rxjs/add/observable/defer";  import "rxjs/add/observable/of";  import "rxjs/add/operator/multicast";    const source = Observable.defer(() => Observable.of(    Math.floor(Math.random() * 100)  ));    function observer(name: string) {    return {      next: (value: number) => console.log(`observer ${name}: ${value}`),      complete: () => console.log(`observer ${name}: complete`)    };  }    const m = source.multicast(new Subject<number>());  m.subscribe(observer("a"));  m.subscribe(observer("b"));  m.connect();  </code></pre>    <p>代码改变后,现在观察者可以收到 <code>next</code> 通知了:</p>    <pre>  <code class="language-javascript">observer a: 54  observer b: 54  observer a: complete  observer b: complete  </code></pre>    <p>调用 <code>connect</code> 时,传入 <code>multicast</code> 操作符的 subject 会订阅源 observable,而 subject 的观察者会收到多播通知,这正符合 RxJS 多播的基本心智模型。</p>    <p>ConnectableObservable 还有另外一个方法 <code>refCount</code>,它可以用来确定源 observable 何时产生了订阅。</p>    <p><code>refCount</code> 看上去就像是操作符,也就是说,它是在 observable 上调用的方法并且返回另一个 observable,但是它只是 <code>ConnectableObservable</code> 的方法而且不需要导入。顾名思义,<code>refCount</code> 返回 observable, 它负责维护已产生的订阅的引用计数。</p>    <p>当观察者订阅负责引用计数的 observable 时,引用计数会增加,如果前一个引用计数为0的话,负责多播基础结构的 subject 会订阅源 observable 。当观察者取消订阅时,引用计数会减少,如果引用计数归零的话,subject 会取消对源 observable 的订阅。</p>    <p>我们来使用 <code>refCount</code> :</p>    <pre>  <code class="language-javascript">const m = source.multicast(new Subject<number>()).refCount();  m.subscribe(observer("a"));  m.subscribe(observer("b"));  </code></pre>    <p>代码改变后,输出如下所示:</p>    <pre>  <code class="language-javascript">observer a: 42  observer a: complete  observer b: complete  </code></pre>    <p>只有第一个观察者收到了 <code>next</code> 通知。我们来看看原因。</p>    <p>示例中的源 observable 会立即发出通知。也就是说,一旦订阅了,源 observable 就会发出 <code>next</code> 和 <code>complete</code> 通知,<code>complete</code> 通知导致在第二个观察者订阅之前第一个就取消了订阅。当第一个取消订阅时,引用计数会归零,所以负责多播基础结构的 subject 也会取消源 observable 的订阅。</p>    <p>当第二个观察者订阅时,subject 会再次订阅源 observable,但由于 subject 已经收到了 <code>complete</code> 通知,所以它无法被重用。</p>    <p>向 <code>multicast</code> 传入 subject 的工厂函数可以解决此问题:</p>    <pre>  <code class="language-javascript">const m = source.multicast(() => new Subject<number>()).refCount();  m.subscribe(observer("a"));  m.subscribe(observer("b"));  </code></pre>    <p>代码改变后,每次源 observable 被订阅时,都会创建一个新的 subject,输出如下所示:</p>    <pre>  <code class="language-javascript">observer a: 42  observer a: complete  observer b: 54  observer b: complete  </code></pre>    <p>因为源 observable 会立即发出通知,所以观察者收到的通知是分开的。将 <code>source</code> 进行修改,以便延迟通知:</p>    <pre>  <code class="language-javascript">import { Observable } from "rxjs/Observable";  import { Subject } from "rxjs/Subject";  import "rxjs/add/observable/defer";  import "rxjs/add/observable/of";  import "rxjs/add/operator/delay";  import "rxjs/add/operator/multicast";    const source = Observable.defer(() => Observable.of(    Math.floor(Math.random() * 100)  )).delay(0);  </code></pre>    <p>观察者依然会收到多播通知,输出如下所示:</p>    <pre>  <code class="language-javascript">observer a: 42  observer b: 42  observer a: complete  observer b: complete  </code></pre>    <p>总结一下,上述示例展示了 <code>multicast</code> 操作符的以下特点:</p>    <ul>     <li>封装了多播的基础结构以符合多播的心智模型;</li>     <li>提供了 <code>connect</code> 方法以用于确定源 observable 何时产生了订阅;</li>     <li>提供了 <code>refCount</code> 方法以用于自动管理源 observable 的订阅;</li>     <li>如果使用 <code>refCount</code>,必须传入 <code>Subject</code> 的工厂函数,而不是 <code>Subject</code> 实例;</li>    </ul>    <p>接下来我们来看 <code>publish</code> 和 <code>share</code> 操作符,以及 <code>publish</code> 的变种,看看它们是如何在 <code>multicast</code> 操作符所提供的基础之上建立的。</p>    <p> </p>    <h2>publish 操作符</h2>    <p>我们通过下面的示例来看看 <code>publish</code> 操作符:</p>    <pre>  <code class="language-javascript">import { Observable } from "rxjs/Observable";  import "rxjs/add/observable/defer";  import "rxjs/add/observable/of";  import "rxjs/add/operator/delay";  import "rxjs/add/operator/publish";    function random() {    return Math.floor(Math.random() * 100);  }    const source = Observable.concat(    Observable.defer(() => Observable.of(random())),    Observable.defer(() => Observable.of(random())).delay(1)  );    function observer(name: string) {    return {      next: (value: number) => console.log(`observer ${name}: ${value}`),      complete: () => console.log(`observer ${name}: complete`)    };  }    const p = source.publish();  p.subscribe(observer("a"));  p.connect();  p.subscribe(observer("b"));  setTimeout(() => p.subscribe(observer("c")), 10);  </code></pre>    <p>示例中的源 observable 会立即发出一个随机数,经过短暂的延迟后发出另一个随机数,然后完成。这个示例可以让我们看到订阅者在 <code>connect</code> 调用前、<code>connect</code> 调用后以及调用过 publish 的 observable 完成后订阅分别会发生什么。</p>    <p><strong><code>publish</code></strong> <strong>操作符是对</strong> <strong><code>multicast</code></strong> <strong>操作符进行了一层薄薄的封装。它会调用</strong><strong><code>multicast</code></strong> <strong>并传入</strong> <code><strong>Subject</strong></code> <strong>。</strong></p>    <p>示例的输出如下所示:</p>    <pre>  <code class="language-javascript">observer a: 42  observer a: 54  observer b: 54  observer a: complete  observer b: complete  observer c: complete  </code></pre>    <p>观察者收到的通知可归纳如下:</p>    <ul>     <li><code>a</code> 是在 <code>connect</code> 调用前订阅的,所以它能收到两个 <code>next</code> 通知和 <code>complete</code> 通知。</li>     <li><code>b</code> 是在 <code>connect</code> 调用后订阅的,此时第一个立即发送的 <code>next</code> 通知已经发出过了,所以它只能收到第二个 <code>next</code> 通知和 <code>complete</code> 通知。</li>     <li><code>c</code> 是在源 observable 完成后订阅的,所以它只能收到 <code>complete</code> 通知。</li>    </ul>    <p>使用 <code>refCount</code> 来代替 <code>connect</code>:</p>    <pre>  <code class="language-javascript">const p = source.publish().refCount();  p.subscribe(observer("a"));  p.subscribe(observer("b"));  setTimeout(() => p.subscribe(observer("c")), 10);  </code></pre>    <p>示例的输出如下所示:</p>    <pre>  <code class="language-javascript">observer a: 42  observer a: 54  observer b: 54  observer a: complete  observer b: complete  observer c: complete  </code></pre>    <p>输出跟使用 <code>connect</code> 时的类似。这是为什么?</p>    <p><code>b</code> 没有收到第一个 <code>next</code> 通知是因为源 observable 的第一个 <code>next</code> 通知是立即发出的,所以只有 <code>a</code> 能收到。</p>    <p><code>c</code> 是在调用过 publish 的 observable 完成后订阅的,所以订阅的引用计数已经是0,此时将会再生成一个订阅。但是,<code>publish</code> 传给 <code>multicast</code> 的是 subject,而不是工厂函数,因为 subjects 无法被复用,所以 <code>c</code> 只能收到 <code>complete</code> 通知。</p>    <p><code>publish</code> 和 <code>multicast</code> 操作符都接受一个可选的 <code>selector</code> 函数,如果指定了此函数,操作符的行为将会有很大的不同。这将在另一篇文章 <a href="/misc/goto?guid=4959756663716116705" rel="nofollow noreferrer">multicast 的秘密</a>中详细介绍。</p>    <p> </p>    <h2>特殊类型的 subjects</h2>    <p><code>publish</code> 操作符有几个变种,它们都以一种类似的方式对 <code>multicast</code> 进行了包装,传入的是 subjects,而不是工厂函数。但是,它们传入的是不同类型的 subjects 。</p>    <p><code>publish</code> 变种使用的特殊类型的 subjects 包括:</p>    <ul>     <li><code>BehaviorSubject</code></li>     <li><code>ReplaySubject</code></li>     <li><code>AsyncSubject</code></li>    </ul>    <p>关于如何使用这些特殊类型的 subjects 的答案是: 每个变种都与一个特殊类型的 subject 相关联,当你需要的行为类似于某个 <code>publish</code> 变种时,就使用相对应的 subject 。我们来看看这些变种的行为是怎样的。</p>    <h2>publishBehavior 操作符</h2>    <p><code>publishBehavior</code> 传给 <code>multicast</code> 的是 <code>BehaviorSubject</code>,而不是 <code>Subject</code>。<code>BehaviorSubject</code> 类似于 <code>Subject</code>,但如果 subject 的订阅发生在源 observable 发出 <code>next</code> 通知之前,那么 subject 会发出包含初始值的 <code>next</code> 通知。</p>    <p>我们更改下示例,给生成随机数的源 observable 加上短暂的延迟,这样它就不会立即发出随机数:</p>    <pre>  <code class="language-javascript">const delayed = Observable.timer(1).switchMapTo(source);  const p = delayed.publishBehavior(-1);  p.subscribe(observer("a"));  p.connect();  p.subscribe(observer("b"));  setTimeout(() => p.subscribe(observer("c")), 10);  </code></pre>    <p>示例的输出如下所示:</p>    <pre>  <code class="language-javascript">observer a: -1  observer b: -1  observer a: 42  observer b: 42  observer a: 54  observer b: 54  observer a: complete  observer b: complete  observer c: complete  </code></pre>    <p>观察者收到的通知可归纳如下:</p>    <ul>     <li><code>a</code> 是在 <code>connect</code> 调用前订阅的,所以它能收到带有 subject 的初始值的 <code>next</code> 通知、源 observable 的两个 <code>next</code> 通知和 <code>complete</code> 通知。</li>     <li><code>b</code> 是在 <code>connect</code> 调用后但在 subject 收到源 observable 的第一个 <code>next</code> 通知前订阅的,所以它能收到带有 subject 的初始值的 <code>next</code> 通知、源 observable 的两个 <code>next</code> 通知和 <code>complete</code> 通知。</li>     <li><code>c</code> 是在源 observable 完成后订阅的,所以它只能收到 <code>complete</code> 通知。</li>    </ul>    <h2>publishReplay 操作符</h2>    <p><code>publishReplay</code> 传给 <code>multicast</code> 的是 <code>ReplaySubject</code>,而不是 <code>Subject</code> 。顾名思义,每当观察者订阅时,<code>ReplaySubject</code> 会重放指定数量的 <code>next</code> 通知。</p>    <pre>  <code class="language-javascript">const p = source.publishReplay(1);  p.subscribe(observer("a"));  p.connect();  p.subscribe(observer("b"));  setTimeout(() => p.subscribe(observer("c")), 10);  </code></pre>    <p>使用了 <code>publishReplay</code>,示例的输出如下所示:</p>    <pre>  <code class="language-javascript">observer a: 42  observer b: 42  observer a: 54  observer b: 54  observer a: complete  observer b: complete  observer c: 54  observer c: complete  </code></pre>    <p>观察者收到的通知可归纳如下:</p>    <ul>     <li><code>a</code> 是在 <code>connect</code> 调用前订阅的,此时 subject 还没有收到 <code>next</code> 通知,所以 <code>a</code> 能收到源 observable 的两个 <code>next</code> 通知和 <code>complete</code> 通知。</li>     <li><code>b</code> 是在 <code>connect</code> 调用后订阅的,此时 subject 已经收到了源 observable 的第一个 <code>next</code> 通知,所以 <code>b</code> 能收到重放的 <code>next</code> 通知、源 observable 的第二个 <code>next</code> 通知和 <code>complete</code> 通知。</li>     <li><code>c</code> 是在源 observable 完成后订阅的,所以它能收到重放的 <code>next</code> 通知和 <code>complete</code>通知。</li>    </ul>    <p>来看看 <code>c</code> 的行为,很明显,不同于 <code>publish</code> 操作符,<code>publishReplay</code> 操作符适合使用 <code>refCount</code> 方法,因为观察者在源 observable 完成后订阅依然能收到任意数量的重放的 <code>next</code> 通知。</p>    <h2>publishLast 操作符</h2>    <p><code>publishLast</code> 传给 <code>multicast</code> 的是 <code>AsyncSubject</code>,而不是 <code>Subject</code>。<code>AsyncSubject</code> 是最特别的特殊类型 subjects 。只有当它完成时,才会发出 <code>next</code> 通知 (如果有 <code>next</code> 通知的话) 和 <code>complete</code> 通知,这个 <code>next</code> 通知是源 observable 中的最后一个 <code>next</code> 通知。</p>    <pre>  <code class="language-javascript">const p = source.publishLast();  p.subscribe(observer("a"));  p.connect();  p.subscribe(observer("b"));  setTimeout(() => p.subscribe(observer("c")), 10);  </code></pre>    <p>使用了 <code>publishLast</code>,示例的输出如下所示:</p>    <pre>  <code class="language-javascript">observer a: 54  observer b: 54  observer a: complete  observer b: complete  observer c: 54  observer c: complete  </code></pre>    <p>观察者收到的通知可归纳如下:</p>    <ul>     <li><code>a</code> 和 <code>b</code> 都是在源 observable 完成前订阅的,但直到源 observable 完成它们才能收到通知,它们能收到带有第二个随机数的 <code>next</code> 通知和 <code>complete</code> 通知。</li>     <li><code>c</code> 是在源 observable 完成后订阅的,它能收到带有第二个随机数的 <code>next</code> 通知和 <code>complete</code> 通知。</li>    </ul>    <p>与 <code>publishReplay</code> 类似,<code>publishLast</code> 操作符适合使用 <code>refCount</code> 方法,因为观察者在源 observable 完成后订阅依然能收到任意数量的重放的 <code>next</code> 通知。</p>    <p> </p>    <h2>share 操作符</h2>    <p><code>share</code> 操作符类似于使用 <code>publish().refCount()</code> 。但是,<code>share</code> 传给 <code>multicast</code>的是工厂函数,这意味着在引用计数为0之后发生订阅的话,会创建一个新的 <code>Subject</code> 来订阅源 observable 。</p>    <pre>  <code class="language-javascript">const s = source.share();  s.subscribe(observer("a"));  s.subscribe(observer("b"));  setTimeout(() => s.subscribe(observer("c")), 10);  </code></pre>    <p>使用了 <code>share</code>,示例的输出如下所示:</p>    <pre>  <code class="language-javascript">observer a: 42  observer a: 54  observer b: 54  observer a: complete  observer b: complete  observer c: 6  observer c: 9  observer c: complete  </code></pre>    <p>观察者收到的通知可归纳如下:</p>    <ul>     <li><code>a</code> 订阅后立即收到第一个 <code>next</code> 通知,随后是第二个 <code>next</code> 通知和 <code>complete</code> 通知。</li>     <li><code>b</code> 只能收到第二个 <code>next</code> 通知和 <code>complete</code> 通知。</li>     <li><code>c</code> 是在源 observable 完成后订阅的,会创建一个新的 subject 来订阅源 observable,它会立即收到第一个 <code>next</code> 通知,随后是第二个 <code>next</code> 通知和 <code>complete</code> 通知。</li>    </ul>    <p>在上面这些示例中,我们介绍了 <code>publish</code> 和 <code>share</code> 操作符,当源 observable 完成时,<code>a</code> 和 <code>b</code> 会自动取消订阅。如果源 observable 报错,它们也同样会自动取消订阅。<code>publish</code> 和 <code>share</code> 操作符还有另外一个不同点:</p>    <ul>     <li>如果源 observable 报错,由 <code>publish</code> 返回的 observable 的任何将来的订阅者都将收到 <code>error</code> 通知。</li>     <li>但是,由 <code>share</code> 返回的 observable 的任何将来的订阅者会生成源 observable 的一个新订阅,因为错误会自动取消任何订阅者的订阅,将其引用计数归零。</li>    </ul>    <p>就这样了,本文到此结束。我们介绍了六个操作符,但它们全是通过一种类似的方式来实现的,它们全都符合同一个基本的心智模型: 一个源 observable、一个订阅源 observable 的 subject 和多个订阅 subject 的观察者。</p>    <p>本文只是简略地介绍了 <code>refCount</code> 方法。想要深入了解,请参见 <a href="/misc/goto?guid=4959756663820427830" rel="nofollow noreferrer">RxJS: 如何使用 refCount</a>。</p>    <p>来自:</p>    <p> </p>