使用 RxJS 掌控异步
3093279013
8年前
<p><img src="https://simg.open-open.com/show/33a1cfc8de65c9ba2e416a9e1f23655c.png"></p> <p>教你使用 RxJS 在 200 行代码内优雅的实现文件分片断点续传。</p> <p>本文是一系列介绍 RxJS 文章的第三篇,这一系列的文章将从一个小的例子开始,逐渐深入的讲解 RxJS 在各种场景下的应用。对应的,也会有对 RxJS 各种操作符的讲解。这篇文章将接着第二篇用 RxJS 连接世界 中的例子,实现一个文件分片断点续传的例子。在例子中,会使用更多操作符(RxJS Operator) 来处理我们的业务,后续的文章中将会详细的讲解这些操作符的作用和使用场景。</p> <h2>Intro</h2> <p><a href="/misc/goto?guid=4959735880308530312" rel="nofollow,noindex">ben lesh </a> 经常在他的各种 talking 中将 RxJS 比作 <em>Lodash for Async</em> 用来彰显 RxJS 的强大异步控制能力,而 RxJS 对于异步而言确实有着媲美 <a href="/misc/goto?guid=4959730793293430913" rel="nofollow,noindex"> lodash </a> 之于 Array 的强大功能。与 lodash 的优秀性能类似,RxJS 在操作异步任务的时的性能也是非常优秀的,并不会因为高等级的抽象而牺牲过多的性能。本篇文章将会以一个相对复杂的异步任务为例子,逐步介绍 RxJS 如何简洁优雅的进行复杂的异步控制。</p> <h2>准备工作</h2> <p>在 <a href="/misc/goto?guid=4959735880437693303" rel="nofollow,noindex"> learning-rxjs </a> clone 项目所需的 seed,并基于 article3-seed checkout 一个你的 article3 分支。本文中所有涉及到 RxJS 的代码将全部使用 TypeScript 编写。</p> <p>这篇文章中,我们将使用 RxJS 实现以下功能:</p> <p>article3-seed 分支附带了一个简单的文件上传的 server,它的作用是实现一个简单的文件分片上传 API。</p> <p>一共有三个 API 提供调用:</p> <ul> <li> <p>post /api/upload/chunk</p> <p>用来获取文件的分片信息,上传文件大小,文件 md5,文件修改时间和文件名</p> <p>服务端返回文件分片数量,每个分片大小和文件唯一的 fileKey</p> <ul> <li> <p>Request body:</p> </li> </ul> <pre> <code class="language-javascript">{ fileSize: string // 文件大小 fileMD5: string // 文件 md5 lastUpdated: ISOString // 文件上次修改时间 fileName: string // 文件名 }</code></pre> <ul> <li> <p>Response:</p> </li> </ul> <pre> <code class="language-javascript">{ chunkSize: number // 每个分片大小 chunks: number // 分片数量 fileKey: string // 唯一文件 id }</code></pre> </li> <li> <p>post /api/upload/chunk/:fileKey?chunk=:chunk&chunks=:chunks</p> <p>用来上传文件分片</p> <ul> <li> <p>Request header: 'Content-Type': 'application/octet-stream'</p> </li> <li> <p>Request body: blob</p> </li> <li> <p>Response: 'ok' or error message</p> </li> </ul> </li> <li> <p>post /api/upload/chunk/:fileKey</p> <p>结算文件分片,后端会将各分片拼接成一个完整的文件并返回</p> <p>Response: 'ok' or error message</p> </li> </ul> <p>在这几个 API 的基础上,我们将在这篇文章中实现以下的功能</p> <ol> <li> <p>在 add 按钮左边增加一个按钮,用来选择一个文件 & (暂停 & 恢复) 文件的上传</p> </li> <li> <p>在增加文件后:</p> <ul> <li> <p>计算文件 md5 ,文件名,文件上次修改时间,文件大小等信息</p> </li> <li> <p>调用 <em>post /api/upload/chunk</em> 接口,获取文件分片信息</p> </li> <li> <p>根据获取的分片信息对文件分片,并且调用 <em>post /api/upload/chunk/:fileKey?chunk=:chunk&chunks=:chunks</em> 上传文件分片,上传过程保证每次只有三个分片同时上传</p> </li> <li> <p>上传完所有的分片后,调用 <em>post /api/upload/chunk/:fileKey</em> 结算文件</p> </li> </ul> </li> <li> <p>上传的过程中,input 框下有一个进度条显示上传进度</p> </li> <li> <p>在上传开始后,选择文件的按钮变成暂停按钮,点击则暂停上传</p> </li> <li> <p>点击暂停上传按钮后,按钮变成继续上传按钮,点击则在暂停的地方继续上传</p> </li> </ol> <p>为了实现上面的功能,并且将这些功能与之前的 todo app 区分开来,我们在 src 文件夹下新建一个 FileUploader.ts 文件并在这个文件中实现这些需求:</p> <pre> <code class="language-javascript">// FileUploader.ts import { Observable } from 'rxjs' // @warn memory leak const $attachment = document.querySelector('.attachment') export class FileUploader { private file$ = Observable.fromEvent($attachment, 'change') .map((r: Event) => (r.target as HTMLInputElement).files[0]) .filter(f => !!f) uploadStream$ = this.file$ }</code></pre> <p>在 html 中加入 attachment 节点:</p> <pre> <code class="language-javascript">// index.html ... <div class="input-group-btn"> <label class="btn btn-default btn-file glyphicon glyphicon-paperclip attachment"> <input type="file" style="display: none;"> </label> <div class="btn btn-default button-add">Add</div> </div> ...</code></pre> <p>调整一下样式:</p> <pre> <code class="language-javascript">// style.css ... .attachment { top: 0; }</code></pre> <p>然后在 app.ts 中将这个我们将要实现功能的流 merge 到 app$ 中:</p> <pre> <code class="language-javascript">... import { FileUploader } from './FileUploader' ... const uploader = new FileUploader() const app$ = toggle$.merge(remove$, search$, uploader.uploadStream$) .do(r => { console.log(r) }) app$.subscribe()</code></pre> <p>这个时候通过 attachment 按钮选择一个文件,就已经可以在控制台中看到从 app$ 中流出的 file了:</p> <p><img src="https://simg.open-open.com/show/3a9be73d9b449c589fcf94b681e8baec.png"></p> <h2>获取文件分片信息</h2> <p>我们使用 FileReader + spark-md5 计算文件的 md5 信息,其它信息直接可以从 File 对象上拿到。而这里的 FileReader 读取文件是一个异步的过程,我们将它封装成 Observable 以便和 uploadStream$ 组合:</p> <pre> <code class="language-javascript">import { Observable, Observer } from 'rxjs' // spark-md5 没有第三方 .d.ts 文件,这里用 commonjs 风格的 require 它 // 如果未再 tsconfig.json 中设置 noImplicitAny: true 且 TypeScript 版本大于 2.1 则也可以用 // import * as SparkMD5 from 'spark-md5' 的方式引用 const SparkMD5 = require('spark-md5') const $attachment = document.querySelector('.attachment') interface FileInfo { fileSize: number fileMD5: string lastUpdated: string fileName: string } export class FileUploader { private file$ = Observable.fromEvent($attachment, 'change') .map((r: Event) => (r.target as HTMLInputElement).files[0]) .filter(f => !!f) uploadStream$ = this.file$ .switchMap(this.readFileInfo) private readFileInfo(file: File): Observable<{ file: File, fileinfo: FileInfo }> { const reader = new FileReader() const spark = new SparkMD5.ArrayBuffer() reader.readAsArrayBuffer(file) return Observable.create((observer: Observer<{ file: File, fileinfo: FileInfo }>) => { reader.onload = (e: Event) => { spark.append((e.target as FileReader).result) const fileMD5 = spark.end() observer.next({ file, fileinfo: { fileMD5, fileSize: file.size, lastUpdated: file.lastModifiedDate.toISOString(), fileName: file.name } }) observer.complete() } return () => { if (!reader.result) { console.warn('read file aborted') reader.abort() } } }) } }</code></pre> <p>此时已经可以看到文件的 FileInfo可以从 app$ 中流出:</p> <p><img src="https://simg.open-open.com/show/22abab58b5ca31d4d1912e363c37aaaf.png"></p> <p>再使用文件信息通过 post /api/upload/chunk 接口获取文件的分片信息:</p> <pre> <code class="language-javascript">... const apiHost = 'http://127.0.0.1:5000/api' ... interface ChunkMeta { fileSize: number chunkSize: number chunks: number fileKey: string } ... export class FileUploader { ... uploadStream$ = this.file$ .switchMap(this.readFileInfo) .switchMap(i => Observable.ajax .post(`${apiHost}/upload/chunk`, i.fileinfo) ) }</code></pre> <p><img src="https://simg.open-open.com/show/c0cd0cb4d4b50f5b7f1568b52c073f5f.png"></p> <h2>分片上传</h2> <p>获取分片信息之后,我们首先要做的事情是将文件按照分片信息分片,做一个 slice 方法来将文件分片:</p> <pre> <code class="language-javascript">... export class FileUploader { ... uploadStream$ = this.file$ .switchMap(this.readFileInfo) .switchMap(i => Observable.ajax .post(`${apiHost}/upload/chunk`, i.fileinfo) .map((r) => { const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize) return { blobs, chunkMeta: r.response } }) ) ... private slice(file: File, n: number, chunkSize: number): Blob[] { const result: Blob[] = [] for (let i = 0; i < n; i ++) { const startSize = i * chunkSize const slice = file.slice(startSize, i === n - 1 ? startSize + (file.size - startSize) : (i + 1) * chunkSize) result.push(slice) } return result } }</code></pre> <p>这时,我们就能看到分片后的 blobs 和 meta 信息:</p> <p><img src="https://simg.open-open.com/show/0a2ab9c6cb14e04a42e9df783bd59f59.png"></p> <p>将文件切片完成之后,我们需要实现一个上传分片的方法:</p> <pre> <code class="language-javascript">... export class FileUploader { ... uploadStream$ = this.file$ .switchMap(this.readFileInfo) .switchMap(i => Observable.ajax .post(`${apiHost}/upload/chunk`, i.fileinfo) .map((r) => { const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize) return { blobs, chunkMeta: r.response } }) ) .switchMap(({ blobs, chunkMeta }) => { const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob)) const uploadStream = Observable.from(dists) .mergeAll(this.concurrency) return Observable.forkJoin(uploadStream) .mapTo(chunkMeta) }) constructor( private concurrency = 3 ) { } ... private uploadChunk(meta: ChunkMeta, index: number, blob: Blob) { const host = `${apiHost}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}` return Observable.ajax({ url: host, body: blob, method: 'post', crossDomain: true, headers: { 'Content-Type': 'application/octet-stream' } }) } }</code></pre> <p>这里的 uploadChunk 是上传单个文件分片的方法,uploadStream$ 中最后面一个 <strong>switchMap</strong> 中的逻辑是使用 <strong>mergeAll</strong> 操作符将所有上传的流 <strong>merge</strong> 成一个 Observable,行为就是并发的上传所有的分片。而下面的 forkJoin 操作符则是等 <strong>merge</strong> 之后的 uploadStream <strong>complete</strong> 之后再 emit 一个结果。这里的 <strong>mergeAll</strong> + <strong>forkJoin</strong> 的用法其实与 <strong>Promise.all</strong> 的行为非常类似,这里也可以写成:</p> <pre> <code class="language-javascript">... const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob)) return Observable.forkJoin(... dists) .mapTo(chunkMeta) ...</code></pre> <p>但我们有一个需求是 <em>上传过程保证每次只有三个分片同时上传</em> , 所以需要使用 mergeAll 方法并传入 concurrency = 3 来控制并发数量,现在可以选择一个文件在 Devtool 上观察上传的行为。如果程序没有出问题行为应该是:并发上传文件分片,并且永远只有 3 个分片同时上传,在上传完所有分片后 app$ 中流出 chunkMeta 数据。</p> <p>最后,我们只需要结算这些分片,这个文件就算上传完成了:</p> <pre> <code class="language-javascript">... export class FileUploader { ... uploadStream$ = this.file$ .switchMap(this.readFileInfo) .switchMap(i => Observable.ajax .post(`${apiHost}/upload/chunk`, i.fileinfo) .map((r) => { const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize) return { blobs, chunkMeta: r.response } }) ) .switchMap(({ blobs, chunkMeta }) => { const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob)) const uploadStream = Observable.from(dists) .mergeAll(this.concurrency) return Observable.forkJoin(uploadStream) .mapTo(chunkMeta) }) .switchMap((r: ChunkMeta) => Observable.ajax.post(`${apiHost}/upload/chunk/${r.fileKey}`) .mapTo({ action: 'UPLOAD_SUCCESS', payload: r }) ) }</code></pre> <p>这时,选择一个文件后,可以看到它被分片上传,并且在结算后在 $app 中发送了一条数据:</p> <pre> <code class="language-javascript">{ "action": "UPLOAD_SUCCESS", "payload": { "chunkSize": 1048576, "chunks": 26, "fileKey": "00a12bdc10449d8ec93883a7d45292a30c", "fileSize": 26621938 } }</code></pre> <p>并且在项目的 chunks 文件夹下面可以找到这个被结算的文件。</p> <h2>进度条</h2> <p>为了实现在界面中实时显示进度条,我们先要在 index.html 中加入进度条标签:</p> <pre> <code class="language-javascript">// index.html ... <div class="progress"> <div class="progress-bar progress-bar-success" role="progressbar" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%"> <span>0%</span> </div> </div> ...</code></pre> <p>调整一下样式中文字的颜色:</p> <pre> <code class="language-javascript">// style.css ... .progress-bar > span { color: black; }</code></pre> <p>这个时候界面看起来应该是这样的:</p> <p><img src="https://simg.open-open.com/show/ca206db0435c5e93efed46d12fcbc1ef.png"></p> <p>要获取总体的上传进度,必须先获取单个分片的上传进度,Observable.ajax 有一个方法可以获取 progress:..</p> <pre> <code class="language-javascript">... import { Observable, Observer, Subscriber } from 'rxjs' ... export class FileUploader { ... private uploadChunk(meta: ChunkMeta, index: number, blob: Blob): Observable<ProgressEvent> { const host = `${apiHost}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}` return Observable.create((subscriber: Subscriber<ProgressEvent>) => { const ajax$ = Observable.ajax({ url: host, body: blob, method: 'post', crossDomain: true, headers: { 'Content-Type': 'application/octet-stream' }, progressSubscriber: subscriber }) const subscription = ajax$.subscribe() return () => subscription.unsubscribe() }) } }</code></pre> <p>这样一来我们就可以在 uploadSteram$ 中计算总体的上传进度了:</p> <pre> <code class="language-javascript">... export class FileUploader { uploadStream$ = this.file$ .switchMap(this.readFileInfo) .switchMap(i => Observable.ajax .post(`${apiHost}/upload/chunk`, i.fileinfo) .map((r) => { const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize) return { blobs, chunkMeta: r.response } }) ) .switchMap(({ blobs, chunkMeta }) => { const uploaded: number[] = [] const dists = blobs.map((blob, index) => { let currentLoaded = 0 return this.uploadChunk(chunkMeta, index, blob) .do(r => { currentLoaded = r.loaded / chunkMeta.fileSize uploaded[index] = currentLoaded const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0)) const p = Math.round(percent * 100) $progressBar.style.width = `${p}%` $progressBar.firstElementChild.textContent = `${p > 1 ? p - 1 : p} %` }) }) const uploadStream = Observable.from(dists) .mergeAll(this.concurrency) return Observable.forkJoin(uploadStream) .mapTo(chunkMeta) }) .switchMap((r: ChunkMeta) => Observable.ajax.post(`${apiHost}/upload/chunk/${r.fileKey}`) .mapTo({ action: 'UPLOAD_SUCCESS', payload: r }) ) .do(() => { $progressBar.firstElementChild.textContent = '100 %' }) }</code></pre> <p>这个时候我们可以在界面中看到文件分片上传的进度了。</p> <p>而一般为了方便使用与调试,我们一般将所有的类似:</p> <pre> <code class="language-javascript">{ action: 'UPLOAD_SUCCESS', payload: { chunkSize: 1048576, chunks: 26, fileKey: "00a12bdc10449d8ec93883a7d45292a30c", fileSize: 26621938 } }</code></pre> <p>的 local state 放在一个流里面:</p> <pre> <code class="language-javascript">import { Observable, Subscriber, Subject } from 'rxjs' ... type Action = 'pause' | 'resume' | 'progress' | 'complete' ... export class FileUploader { ... private action$ = new Subject<{ name: Action payload?: any }>() private progress$ = this.action$ .filter(action => action.name === 'progress') .map(action => action.payload) .do(r => { const percent = Math.round(r * 100) $progressBar.style.width = `${percent}%` $progressBar.firstElementChild.textContent = `${percent > 1 ? percent - 1 : percent} %` }) .map(r => ({ action: 'PROGRESS', payload: r })) uploadStream$ = this.file$ ... return this.uploadChunk(chunkMeta, index, blob) .do(r => { currentLoaded = r.loaded / chunkMeta.fileSize uploaded[index] = currentLoaded const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0)) this.action$.next({ name: 'progress', payload: percent }) }) ... .merge(this.progerss$) }</code></pre> <p>这时控制台会出现更直观的调试信息:</p> <p><img src="https://simg.open-open.com/show/4f5f1eed093a1ab859fd52d6b5221b26.png"></p> <h2>暂停,续传</h2> <p>根据需求,我们在选择文件后,选择文件的按钮将会变成一个暂停按钮,我们可以用 Observable.fromEvent来实现这个需求:</p> <pre> <code class="language-javascript">... export class FileUploader { ... private click$ = Observable.fromEvent($attachment, 'click') .map((e: Event) => e.target) .filter((e: HTMLElement) => e === $attachment) .scan((acc: number, val: HTMLElement) => { if (val.classList.contains('glyphicon-paperclip')) { return 1 } if (acc === 2) { return 3 } return 2 }, 3) .filter(v => v !== 1) .do((v) => { if (v === 2) { this.action$.next({ name: 'pause' }) $attachment.classList.remove('glyphicon-pause') $attachment.classList.add('glyphicon-play') } else { this.action$.next({ name: 'resume' }) this.buildPauseIcon() } }) uploadStream$ = this.file$ .switchMap... .switchMap... .do(() => this.buildPauseIcon()) ... .do(() => { $progressBar.firstElementChild.textContent = '100 %' // restore icon $attachment.classList.remove('glyphicon-pause') $attachment.classList.add('glyphicon-paperclip'); ($attachment.firstElementChild as HTMLInputElement).disabled = false }) .merge(this.progress$, this.click$) // side effect private buildPauseIcon() { $attachment.classList.remove('glyphicon-paperclip') $attachment.classList.add('glyphicon-pause'); ($attachment.firstElementChild as HTMLInputElement).disabled = true } }</code></pre> <p>这段代码用到涉及到的概念比较多,我们一点点来理解:</p> <p>在 uploadStream$ 的两个 switchMap 下插入了一个 do 操作符,这段代码的作用是将文件上传的图标变成暂停的图标。</p> <p>然后我们新建了一个 click$ 流,为了防止事件冒泡导致的重复推送值,我们用 map + filter 过滤掉了子节点冒泡上来的事件。而为了区分点击的是 上传文件按钮还是 暂停按钮还是 继续按钮,我们用 1,2,3 三个值代表三个不同的点击事件,并使用 scan操作符不停的生成这三个状态。scan 的行为与 Array#reduce 非常相似,它接受一个 accumulator 不停的根据当前的值和状态累加出新的状态(没错,和 Redux 中的 reducer 行为一致)。而在下面的 do 操作符中我们根据不同的状态改变按钮的icon 。</p> <p>这个时候我们观察上传的过程中,点击暂停/继续,图标的状态可以正确切换了。并且在上传完成后图标也被恢复成上传文件的初始状态了。</p> <p><img src="https://simg.open-open.com/show/649d577e4c54730264bd6c5417a5da9d.jpg"></p> <p>为了让整个文件上传可以暂停与继续,我们在 uploadChunk 下使用 takeUntil 与 repeatWhen & retryWhen 操作符:</p> <pre> <code class="language-javascript">... export class FileUploader { ... private action$ = ... private pause$ = this.action$.filter(ac => ac.name === 'pause') private resume$ = this.action$.filter(ac => ac.name === 'resume') private progress$ = this.action$ ... .distinctUntilChanged((x: number, y: number) => x - y >= 0) ... ... private uploadChunk(meta: ChunkMeta, index: number, blob: Blob): Observable<ProgressEvent> { ... return Observable.create( ... const ajax$ = Observable.ajax({ ... }) .takeUntil(this.pause$) .repeatWhen(() => this.resume$) const subscription = ajax$.subscribe() return () => subscription.unsubscribe() ) .retryWhen(() => this.resume$) } }</code></pre> <p><strong>takeUntil</strong> 操作符接受一个 <strong>Observable</strong> ,它在这个 <strong>Observable</strong> 发射值的时候终止上面的 <strong>Observable</strong></p> <p><strong>repeatWhen</strong> 与 <strong>retryWhen</strong> 操作符都是接受一个 <em>projectFunc</em> ,它返回一个 <strong>Observable</strong> 并在这个 <strong>Observable</strong> 发射值的时候 重复/重试。</p> <p>而在暂停恢复的过程中,进度条的数字可能显示错误:上传了一部分的请求被 abort,它的 progress 已经计算过一次了,重试的时候是重新上传,则可能会导致进度条后退,这时我们在 progress$ 后面用 <strong>distinctUntilChanged</strong> 方法即可实现 <strong> <em>只有在进度增长的时候发射值</em> </strong> 这一效果。</p> <h2>结语</h2> <p>这是一篇超级抽象的文章,并且受限于未使用框架,在程序中使用了大量的副作用操作do,总体看起来并没有特别优雅。真正优雅的 FRP 应该是将 RxJS 与 Redux + React 这样的框架结合起来,那时这个文件上传的组件就可以有更优雅的写法。当然它的功能并不完备,很多 edge case 例如各个步骤中的异常处理都没有做,但没有关系,这里只起到一个示范作用来展示 RxJS 在处理异步上的强大功能,并且让初学者有机会亲手把玩 RxJS 的各种操作符并实现一个复杂的异步场景。在后面的文章中,将会深入前面这三篇文章中涉及到或未涉及到的各种操作符,逐渐拨开 RxJS 的迷雾。</p> <p> </p> <p>来自:https://zhuanlan.zhihu.com/p/25059824</p> <p> </p>