使用 RxJS 掌控异步

3093279013 8年前
   <p><img src="https://simg.open-open.com/show/33a1cfc8de65c9ba2e416a9e1f23655c.png"></p>    <p>教你使用 RxJS 在 200 行代码内优雅的实现文件分片断点续传。</p>    <p>本文是一系列介绍 RxJS 文章的第三篇,这一系列的文章将从一个小的例子开始,逐渐深入的讲解 RxJS 在各种场景下的应用。对应的,也会有对 RxJS 各种操作符的讲解。这篇文章将接着第二篇用 RxJS 连接世界 中的例子,实现一个文件分片断点续传的例子。在例子中,会使用更多操作符(RxJS Operator) 来处理我们的业务,后续的文章中将会详细的讲解这些操作符的作用和使用场景。</p>    <h2>Intro</h2>    <p><a href="/misc/goto?guid=4959735880308530312" rel="nofollow,noindex">ben lesh </a> 经常在他的各种 talking 中将 RxJS 比作 <em>Lodash for Async</em> 用来彰显 RxJS 的强大异步控制能力,而 RxJS 对于异步而言确实有着媲美 <a href="/misc/goto?guid=4959730793293430913" rel="nofollow,noindex"> lodash </a> 之于 Array 的强大功能。与 lodash 的优秀性能类似,RxJS 在操作异步任务的时的性能也是非常优秀的,并不会因为高等级的抽象而牺牲过多的性能。本篇文章将会以一个相对复杂的异步任务为例子,逐步介绍 RxJS 如何简洁优雅的进行复杂的异步控制。</p>    <h2>准备工作</h2>    <p>在 <a href="/misc/goto?guid=4959735880437693303" rel="nofollow,noindex"> learning-rxjs </a> clone 项目所需的 seed,并基于 article3-seed checkout 一个你的 article3 分支。本文中所有涉及到 RxJS 的代码将全部使用 TypeScript 编写。</p>    <p>这篇文章中,我们将使用 RxJS 实现以下功能:</p>    <p>article3-seed 分支附带了一个简单的文件上传的 server,它的作用是实现一个简单的文件分片上传 API。</p>    <p>一共有三个 API 提供调用:</p>    <ul>     <li> <p>post /api/upload/chunk</p> <p>用来获取文件的分片信息,上传文件大小,文件 md5,文件修改时间和文件名</p> <p>服务端返回文件分片数量,每个分片大小和文件唯一的 fileKey</p>      <ul>       <li> <p>Request body:</p> </li>      </ul> <pre>  <code class="language-javascript">{    fileSize: string // 文件大小    fileMD5: string // 文件 md5    lastUpdated: ISOString // 文件上次修改时间    fileName: string // 文件名  }</code></pre>      <ul>       <li> <p>Response:</p> </li>      </ul> <pre>  <code class="language-javascript">{    chunkSize: number // 每个分片大小    chunks: number  // 分片数量    fileKey: string // 唯一文件 id  }</code></pre> </li>     <li> <p>post /api/upload/chunk/:fileKey?chunk=:chunk&chunks=:chunks</p> <p>用来上传文件分片</p>      <ul>       <li> <p>Request header: 'Content-Type': 'application/octet-stream'</p> </li>       <li> <p>Request body: blob</p> </li>       <li> <p>Response: 'ok' or error message</p> </li>      </ul> </li>     <li> <p>post /api/upload/chunk/:fileKey</p> <p>结算文件分片,后端会将各分片拼接成一个完整的文件并返回</p> <p>Response: 'ok' or error message</p> </li>    </ul>    <p>在这几个 API 的基础上,我们将在这篇文章中实现以下的功能</p>    <ol>     <li> <p>在 add 按钮左边增加一个按钮,用来选择一个文件 & (暂停 & 恢复) 文件的上传</p> </li>     <li> <p>在增加文件后:</p>      <ul>       <li> <p>计算文件 md5 ,文件名,文件上次修改时间,文件大小等信息</p> </li>       <li> <p>调用 <em>post /api/upload/chunk</em> 接口,获取文件分片信息</p> </li>       <li> <p>根据获取的分片信息对文件分片,并且调用 <em>post /api/upload/chunk/:fileKey?chunk=:chunk&chunks=:chunks</em> 上传文件分片,上传过程保证每次只有三个分片同时上传</p> </li>       <li> <p>上传完所有的分片后,调用 <em>post /api/upload/chunk/:fileKey</em> 结算文件</p> </li>      </ul> </li>     <li> <p>上传的过程中,input 框下有一个进度条显示上传进度</p> </li>     <li> <p>在上传开始后,选择文件的按钮变成暂停按钮,点击则暂停上传</p> </li>     <li> <p>点击暂停上传按钮后,按钮变成继续上传按钮,点击则在暂停的地方继续上传</p> </li>    </ol>    <p>为了实现上面的功能,并且将这些功能与之前的 todo app 区分开来,我们在 src 文件夹下新建一个 FileUploader.ts 文件并在这个文件中实现这些需求:</p>    <pre>  <code class="language-javascript">// FileUploader.ts  import { Observable } from 'rxjs'    // @warn memory leak  const $attachment = document.querySelector('.attachment')    export class FileUploader {      private file$ = Observable.fromEvent($attachment, 'change')      .map((r: Event) => (r.target as HTMLInputElement).files[0])      .filter(f => !!f)      uploadStream$ = this.file$  }</code></pre>    <p>在 html 中加入 attachment 节点:</p>    <pre>  <code class="language-javascript">// index.html  ...  <div class="input-group-btn">    <label class="btn btn-default btn-file glyphicon glyphicon-paperclip attachment">      <input type="file" style="display: none;">    </label>    <div class="btn btn-default button-add">Add</div>  </div>  ...</code></pre>    <p>调整一下样式:</p>    <pre>  <code class="language-javascript">// style.css  ...  .attachment {    top: 0;  }</code></pre>    <p>然后在 app.ts 中将这个我们将要实现功能的流 merge 到 app$ 中:</p>    <pre>  <code class="language-javascript">...  import { FileUploader } from './FileUploader'  ...  const uploader = new FileUploader()    const app$ = toggle$.merge(remove$, search$, uploader.uploadStream$)    .do(r => {      console.log(r)    })    app$.subscribe()</code></pre>    <p>这个时候通过 attachment 按钮选择一个文件,就已经可以在控制台中看到从 app$ 中流出的 file了:</p>    <p><img src="https://simg.open-open.com/show/3a9be73d9b449c589fcf94b681e8baec.png"></p>    <h2>获取文件分片信息</h2>    <p>我们使用 FileReader + spark-md5 计算文件的 md5 信息,其它信息直接可以从 File 对象上拿到。而这里的 FileReader 读取文件是一个异步的过程,我们将它封装成 Observable 以便和 uploadStream$ 组合:</p>    <pre>  <code class="language-javascript">import { Observable, Observer } from 'rxjs'  // spark-md5 没有第三方 .d.ts 文件,这里用 commonjs 风格的 require 它  // 如果未再 tsconfig.json 中设置 noImplicitAny: true 且 TypeScript 版本大于 2.1 则也可以用  // import * as SparkMD5 from 'spark-md5' 的方式引用  const SparkMD5 = require('spark-md5')  const $attachment = document.querySelector('.attachment')    interface FileInfo {    fileSize: number    fileMD5: string    lastUpdated: string    fileName: string  }    export class FileUploader {      private file$ = Observable.fromEvent($attachment, 'change')      .map((r: Event) => (r.target as HTMLInputElement).files[0])      .filter(f => !!f)      uploadStream$ = this.file$      .switchMap(this.readFileInfo)      private readFileInfo(file: File): Observable<{ file: File, fileinfo: FileInfo }> {      const reader = new FileReader()      const spark = new SparkMD5.ArrayBuffer()      reader.readAsArrayBuffer(file)      return Observable.create((observer: Observer<{ file: File, fileinfo: FileInfo }>) => {        reader.onload = (e: Event) => {          spark.append((e.target as FileReader).result)          const fileMD5 = spark.end()          observer.next({            file, fileinfo: {              fileMD5, fileSize: file.size,              lastUpdated: file.lastModifiedDate.toISOString(),              fileName: file.name            }          })          observer.complete()        }        return () => {          if (!reader.result) {            console.warn('read file aborted')            reader.abort()          }        }      })    }  }</code></pre>    <p>此时已经可以看到文件的 FileInfo可以从 app$ 中流出:</p>    <p><img src="https://simg.open-open.com/show/22abab58b5ca31d4d1912e363c37aaaf.png"></p>    <p>再使用文件信息通过 post /api/upload/chunk 接口获取文件的分片信息:</p>    <pre>  <code class="language-javascript">...  const apiHost = 'http://127.0.0.1:5000/api'  ...    interface ChunkMeta {    fileSize: number    chunkSize: number    chunks: number    fileKey: string  }  ...    export class FileUploader {    ...    uploadStream$ = this.file$      .switchMap(this.readFileInfo)      .switchMap(i => Observable.ajax        .post(`${apiHost}/upload/chunk`, i.fileinfo)      )  }</code></pre>    <p><img src="https://simg.open-open.com/show/c0cd0cb4d4b50f5b7f1568b52c073f5f.png"></p>    <h2>分片上传</h2>    <p>获取分片信息之后,我们首先要做的事情是将文件按照分片信息分片,做一个 slice 方法来将文件分片:</p>    <pre>  <code class="language-javascript">...  export class FileUploader {    ...    uploadStream$ = this.file$      .switchMap(this.readFileInfo)      .switchMap(i => Observable.ajax        .post(`${apiHost}/upload/chunk`, i.fileinfo)        .map((r) => {          const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)          return { blobs, chunkMeta: r.response }        })      )      ...      private slice(file: File, n: number, chunkSize: number): Blob[] {      const result: Blob[] = []      for (let i = 0; i < n; i ++) {        const startSize = i * chunkSize        const slice = file.slice(startSize, i === n - 1 ? startSize + (file.size - startSize) : (i + 1) * chunkSize)        result.push(slice)      }      return result    }  }</code></pre>    <p>这时,我们就能看到分片后的 blobs 和 meta 信息:</p>    <p><img src="https://simg.open-open.com/show/0a2ab9c6cb14e04a42e9df783bd59f59.png"></p>    <p>将文件切片完成之后,我们需要实现一个上传分片的方法:</p>    <pre>  <code class="language-javascript">...  export class FileUploader {    ...      uploadStream$ = this.file$      .switchMap(this.readFileInfo)      .switchMap(i => Observable.ajax        .post(`${apiHost}/upload/chunk`, i.fileinfo)        .map((r) => {          const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)          return { blobs, chunkMeta: r.response }        })      )      .switchMap(({ blobs, chunkMeta }) => {        const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob))        const uploadStream = Observable.from(dists)          .mergeAll(this.concurrency)          return Observable.forkJoin(uploadStream)          .mapTo(chunkMeta)      })      constructor(      private concurrency = 3    ) { }    ...    private uploadChunk(meta: ChunkMeta, index: number, blob: Blob) {      const host = `${apiHost}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}`      return Observable.ajax({        url: host,        body: blob,        method: 'post',        crossDomain: true,        headers: { 'Content-Type': 'application/octet-stream' }      })    }  }</code></pre>    <p>这里的 uploadChunk 是上传单个文件分片的方法,uploadStream$ 中最后面一个 <strong>switchMap</strong> 中的逻辑是使用 <strong>mergeAll</strong> 操作符将所有上传的流 <strong>merge</strong> 成一个 Observable,行为就是并发的上传所有的分片。而下面的 forkJoin 操作符则是等 <strong>merge</strong> 之后的 uploadStream <strong>complete</strong> 之后再 emit 一个结果。这里的 <strong>mergeAll</strong> + <strong>forkJoin</strong> 的用法其实与 <strong>Promise.all</strong> 的行为非常类似,这里也可以写成:</p>    <pre>  <code class="language-javascript">...  const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob))    return Observable.forkJoin(... dists)    .mapTo(chunkMeta)  ...</code></pre>    <p>但我们有一个需求是 <em>上传过程保证每次只有三个分片同时上传</em> , 所以需要使用 mergeAll 方法并传入 concurrency = 3 来控制并发数量,现在可以选择一个文件在 Devtool 上观察上传的行为。如果程序没有出问题行为应该是:并发上传文件分片,并且永远只有 3 个分片同时上传,在上传完所有分片后 app$ 中流出 chunkMeta 数据。</p>    <p>最后,我们只需要结算这些分片,这个文件就算上传完成了:</p>    <pre>  <code class="language-javascript">...  export class FileUploader {    ...    uploadStream$ = this.file$      .switchMap(this.readFileInfo)      .switchMap(i => Observable.ajax        .post(`${apiHost}/upload/chunk`, i.fileinfo)        .map((r) => {          const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)          return { blobs, chunkMeta: r.response }        })      )      .switchMap(({ blobs, chunkMeta }) => {        const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob))        const uploadStream = Observable.from(dists)          .mergeAll(this.concurrency)          return Observable.forkJoin(uploadStream)          .mapTo(chunkMeta)      })      .switchMap((r: ChunkMeta) => Observable.ajax.post(`${apiHost}/upload/chunk/${r.fileKey}`)        .mapTo({          action: 'UPLOAD_SUCCESS',          payload: r        })      )  }</code></pre>    <p>这时,选择一个文件后,可以看到它被分片上传,并且在结算后在 $app 中发送了一条数据:</p>    <pre>  <code class="language-javascript">{    "action": "UPLOAD_SUCCESS",    "payload": {      "chunkSize": 1048576,      "chunks": 26,      "fileKey": "00a12bdc10449d8ec93883a7d45292a30c",      "fileSize": 26621938    }  }</code></pre>    <p>并且在项目的 chunks 文件夹下面可以找到这个被结算的文件。</p>    <h2>进度条</h2>    <p>为了实现在界面中实时显示进度条,我们先要在 index.html 中加入进度条标签:</p>    <pre>  <code class="language-javascript">// index.html  ...   <div class="progress">     <div class="progress-bar progress-bar-success" role="progressbar" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%">       <span>0%</span>     </div>  </div>  ...</code></pre>    <p>调整一下样式中文字的颜色:</p>    <pre>  <code class="language-javascript">// style.css  ...  .progress-bar > span {    color: black;  }</code></pre>    <p>这个时候界面看起来应该是这样的:</p>    <p><img src="https://simg.open-open.com/show/ca206db0435c5e93efed46d12fcbc1ef.png"></p>    <p>要获取总体的上传进度,必须先获取单个分片的上传进度,Observable.ajax 有一个方法可以获取 progress:..</p>    <pre>  <code class="language-javascript">...  import { Observable, Observer, Subscriber } from 'rxjs'  ...    export class FileUploader {    ...      private uploadChunk(meta: ChunkMeta, index: number, blob: Blob): Observable<ProgressEvent> {      const host = `${apiHost}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}`      return Observable.create((subscriber: Subscriber<ProgressEvent>) => {        const ajax$ = Observable.ajax({          url: host,          body: blob,          method: 'post',          crossDomain: true,          headers: { 'Content-Type': 'application/octet-stream' },          progressSubscriber: subscriber        })        const subscription = ajax$.subscribe()        return () => subscription.unsubscribe()      })    }  }</code></pre>    <p>这样一来我们就可以在 uploadSteram$ 中计算总体的上传进度了:</p>    <pre>  <code class="language-javascript">...  export class FileUploader {      uploadStream$ = this.file$      .switchMap(this.readFileInfo)      .switchMap(i => Observable.ajax        .post(`${apiHost}/upload/chunk`, i.fileinfo)        .map((r) => {          const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)          return { blobs, chunkMeta: r.response }        })      )      .switchMap(({ blobs, chunkMeta }) => {        const uploaded: number[] = []        const dists = blobs.map((blob, index) => {          let currentLoaded = 0          return this.uploadChunk(chunkMeta, index, blob)            .do(r => {              currentLoaded = r.loaded / chunkMeta.fileSize              uploaded[index] = currentLoaded              const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0))              const p = Math.round(percent * 100)              $progressBar.style.width = `${p}%`              $progressBar.firstElementChild.textContent = `${p > 1 ? p - 1 : p} %`            })        })          const uploadStream = Observable.from(dists)          .mergeAll(this.concurrency)          return Observable.forkJoin(uploadStream)          .mapTo(chunkMeta)      })      .switchMap((r: ChunkMeta) => Observable.ajax.post(`${apiHost}/upload/chunk/${r.fileKey}`)        .mapTo({          action: 'UPLOAD_SUCCESS',          payload: r        })      )      .do(() => {        $progressBar.firstElementChild.textContent = '100 %'      })  }</code></pre>    <p>这个时候我们可以在界面中看到文件分片上传的进度了。</p>    <p>而一般为了方便使用与调试,我们一般将所有的类似:</p>    <pre>  <code class="language-javascript">{    action: 'UPLOAD_SUCCESS',    payload: {      chunkSize: 1048576,      chunks: 26,      fileKey: "00a12bdc10449d8ec93883a7d45292a30c",      fileSize: 26621938    }  }</code></pre>    <p>的 local state 放在一个流里面:</p>    <pre>  <code class="language-javascript">import { Observable, Subscriber, Subject } from 'rxjs'  ...  type Action = 'pause' | 'resume' | 'progress' | 'complete'  ...  export class FileUploader {    ...    private action$ = new Subject<{      name: Action      payload?: any    }>()      private progress$ = this.action$      .filter(action => action.name === 'progress')      .map(action => action.payload)     .do(r => {        const percent = Math.round(r * 100)        $progressBar.style.width = `${percent}%`        $progressBar.firstElementChild.textContent = `${percent > 1 ? percent - 1 : percent} %`     })      .map(r => ({ action: 'PROGRESS', payload: r }))        uploadStream$ = this.file$      ...            return this.uploadChunk(chunkMeta, index, blob)            .do(r => {              currentLoaded = r.loaded / chunkMeta.fileSize              uploaded[index] = currentLoaded              const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0))              this.action$.next({ name: 'progress', payload: percent })            })   ...      .merge(this.progerss$)  }</code></pre>    <p>这时控制台会出现更直观的调试信息:</p>    <p><img src="https://simg.open-open.com/show/4f5f1eed093a1ab859fd52d6b5221b26.png"></p>    <h2>暂停,续传</h2>    <p>根据需求,我们在选择文件后,选择文件的按钮将会变成一个暂停按钮,我们可以用 Observable.fromEvent来实现这个需求:</p>    <pre>  <code class="language-javascript">...  export class FileUploader {    ...      private click$ = Observable.fromEvent($attachment, 'click')      .map((e: Event) => e.target)      .filter((e: HTMLElement) => e === $attachment)      .scan((acc: number, val: HTMLElement) => {        if (val.classList.contains('glyphicon-paperclip')) {          return 1        }        if (acc === 2) {          return 3        }        return 2      }, 3)      .filter(v => v !== 1)     .do((v) => {        if (v === 2) {          this.action$.next({ name: 'pause' })          $attachment.classList.remove('glyphicon-pause')          $attachment.classList.add('glyphicon-play')        } else {          this.action$.next({ name: 'resume' })          this.buildPauseIcon()        }      })      uploadStream$ = this.file$   .switchMap...     .switchMap...     .do(() => this.buildPauseIcon())     ...      .do(() => {        $progressBar.firstElementChild.textContent = '100 %'        // restore icon        $attachment.classList.remove('glyphicon-pause')        $attachment.classList.add('glyphicon-paperclip');        ($attachment.firstElementChild as HTMLInputElement).disabled = false      })      .merge(this.progress$, this.click$)      // side effect    private buildPauseIcon() {      $attachment.classList.remove('glyphicon-paperclip')      $attachment.classList.add('glyphicon-pause');      ($attachment.firstElementChild as HTMLInputElement).disabled = true    }  }</code></pre>    <p>这段代码用到涉及到的概念比较多,我们一点点来理解:</p>    <p>在 uploadStream$ 的两个 switchMap 下插入了一个 do 操作符,这段代码的作用是将文件上传的图标变成暂停的图标。</p>    <p>然后我们新建了一个 click$ 流,为了防止事件冒泡导致的重复推送值,我们用 map + filter 过滤掉了子节点冒泡上来的事件。而为了区分点击的是 上传文件按钮还是 暂停按钮还是 继续按钮,我们用 1,2,3 三个值代表三个不同的点击事件,并使用 scan操作符不停的生成这三个状态。scan 的行为与 Array#reduce 非常相似,它接受一个 accumulator 不停的根据当前的值和状态累加出新的状态(没错,和 Redux 中的 reducer 行为一致)。而在下面的 do 操作符中我们根据不同的状态改变按钮的icon 。</p>    <p>这个时候我们观察上传的过程中,点击暂停/继续,图标的状态可以正确切换了。并且在上传完成后图标也被恢复成上传文件的初始状态了。</p>    <p><img src="https://simg.open-open.com/show/649d577e4c54730264bd6c5417a5da9d.jpg"></p>    <p>为了让整个文件上传可以暂停与继续,我们在 uploadChunk 下使用 takeUntil 与 repeatWhen & retryWhen 操作符:</p>    <pre>  <code class="language-javascript">...  export class FileUploader {     ...     private action$ = ...     private pause$ = this.action$.filter(ac => ac.name === 'pause')     private resume$ = this.action$.filter(ac => ac.name === 'resume')     private progress$ = this.action$       ...       .distinctUntilChanged((x: number, y: number) => x - y >= 0)       ...       ...       private uploadChunk(meta: ChunkMeta, index: number, blob: Blob): Observable<ProgressEvent> {       ...       return Observable.create(         ...         const ajax$ = Observable.ajax({          ...         })            .takeUntil(this.pause$)            .repeatWhen(() => this.resume$)         const subscription = ajax$.subscribe()         return () => subscription.unsubscribe()       )         .retryWhen(() => this.resume$)     }  }</code></pre>    <p><strong>takeUntil</strong> 操作符接受一个 <strong>Observable</strong> ,它在这个 <strong>Observable</strong> 发射值的时候终止上面的 <strong>Observable</strong></p>    <p><strong>repeatWhen</strong> 与 <strong>retryWhen</strong> 操作符都是接受一个 <em>projectFunc</em> ,它返回一个 <strong>Observable</strong> 并在这个 <strong>Observable</strong> 发射值的时候 重复/重试。</p>    <p>而在暂停恢复的过程中,进度条的数字可能显示错误:上传了一部分的请求被 abort,它的 progress 已经计算过一次了,重试的时候是重新上传,则可能会导致进度条后退,这时我们在 progress$ 后面用 <strong>distinctUntilChanged</strong> 方法即可实现 <strong> <em>只有在进度增长的时候发射值</em> </strong> 这一效果。</p>    <h2>结语</h2>    <p>这是一篇超级抽象的文章,并且受限于未使用框架,在程序中使用了大量的副作用操作do,总体看起来并没有特别优雅。真正优雅的 FRP 应该是将 RxJS 与 Redux + React 这样的框架结合起来,那时这个文件上传的组件就可以有更优雅的写法。当然它的功能并不完备,很多 edge case 例如各个步骤中的异常处理都没有做,但没有关系,这里只起到一个示范作用来展示 RxJS 在处理异步上的强大功能,并且让初学者有机会亲手把玩 RxJS 的各种操作符并实现一个复杂的异步场景。在后面的文章中,将会深入前面这三篇文章中涉及到或未涉及到的各种操作符,逐渐拨开 RxJS 的迷雾。</p>    <p> </p>    <p>来自:https://zhuanlan.zhihu.com/p/25059824</p>    <p> </p>