踏破铁鞋无觅处,从 AsyncTask 学 Android 线程池
dsjd
8年前
<p>android对于主线程的响应时间限制的非常严格,稍有不慎就会遇到 <em>Application Not Responding(ANR)</em> 的弹框。用户可以轻点手指关掉你的APP。 同时,保持应用随时响应用户的操作也是良好用户体验的前提。</p> <h2><strong>线程的开始和结束</strong></h2> <p>要做到以上多线程是必不可少的。课本会告诉你什么时候开辟一个线程,但是很少说的一个很重要的问题是结束。比如,我现在在Activity里有一个工作需要创建一个线程执行,但是这个Activity在进入后台后不幸遇到系统回收资源被销毁了。但是这个线程还在漫无目的的游走,耗费资源。</p> <p>如何结束?先创建一个:</p> <pre> <code class="language-java">mThread = Thread(Runnable { // do something here... }) mThread?.start()</code></pre> <p>以上使用kotlin的lambda表达式简写了创建 Runnable 对象部分的代码。主旨还是创建了一个 Runnable 对象,并将其作为参数传入 Thread 。</p> <p>如何让一个 Thread 能够退出呢?这就要在 Runnable 身上下功夫了。首先添加一个是否停止的标识 isCancelled ,一旦值为true则停止线程的运行,否则继续。我们这里不讨论 Thread#interrupt() 这个方法,这个方法诡异的地方太多。</p> <p>首先要给 Runnable “添加一个属性”作为上文的是否停止的标识。直接添加时不可能的, Runnable 只是一个 <em>interface</em> ,不是 <em>class</em> 。所以要实现这个借口为一个抽象类,这样就可以添加属性了。</p> <pre> <code class="language-java">abstract class CancelableRunnable() : Runnable { var isCancelled: Boolean = false }</code></pre> <p>这里使用抽象类,是因为 run() 方法的实现留给使用的时候给出。</p> <pre> <code class="language-java">var runnable = object : CancelableRunnable() { override fun run() { if (isCancelled) { var msg = mHandler.obtainMessage(THREAD_CANCELLED) mHandler.sendMessage(msg) return } Thread.sleep(2000) if (isCancelled) { var msg = mHandler.obtainMessage(THREAD_CANCELLED) mHandler.sendMessage(msg) return } var msg = mHandler.obtainMessage(THREAD_FINISHED) mHandler.sendMessage(msg) } }</code></pre> <p>Thread.sleep(2000) 用来模拟一个费时的任务。开始之前检测是否取消了线程的执行,执行之后在检测。之后的检测是有的时候任务执行之后需要有持久化处理结果或者修改任务完成情况的标识之类的动作,如果已经取消了线程的执行,即使任务执行完成也不持久化结果、不修改完成情况。</p> <p>最后都检测完成之后如果没有取消线程,则发出任务完成执行的消息。</p> <p>发出和处理这些消息的 Handler 的定义:</p> <pre> <code class="language-java">var mHandler = object : Handler() { override fun handleMessage(msg: Message?) { when (msg?.what) { THREAD_CANCELLED -> { mResultTextView.text = "thread cancelled" } THREAD_FINISHED -> { mResultTextView.text = "thread finished" } else -> { mResultTextView.text = "what's going on" } } } }</code></pre> <p>运行在UI线程的 Handler 检测从线程发出的消息,如果是 THREAD_CANCELLED 那就是线程已经取消了,如果是 THREAD_FINISHED 那就是线程完全运行结束。之后根据message的消息设置 TextView 的文本内容。</p> <p>这里使用了两个按钮来启动和停止线程:</p> <pre> <code class="language-java">findViewById(R.id.start_button)?.setOnClickListener { v -> runnable.isCancelled = false mThread = Thread(runnable) mThread?.start() mResultTextView.text = "Thread running..." } findViewById(R.id.stop_button)?.setOnClickListener { v -> this.runnable.isCancelled = true }</code></pre> <p>上面用到的 Runnable 是只做一件事的,如果是连续不断的循环很多事的话也可以使用 <em>white</em> 语句来控制是否一直执行线程的工作。一旦设置为停止线程,则停止线程任务的循环跳出 Runnable#run() 方法,结束线程。</p> <p>完整代码放在附录中。</p> <p>所以,如果你在Activity里开辟了一个线程,在Activity被回收的时候结束线程就可以这么做:</p> <pre> <code class="language-java">override fun onDestroy() { super.onDestroy() this.runnable.isCancelled = true }</code></pre> <p>这样就再也不用担心Activity挂了,线程还阴魂不散了。</p> <h2><strong>AsyncTask</strong></h2> <p>既然缘起 AsyncTask 那就肯定需要读者一起了解一下相关的概念。</p> <p>比起来使用 Handler + Thread + Runnable 的多线程异步执行模式来说,使用 AsyncTask 是简单了非常的多的。</p> <p>先简单了解一下 AsyncTask 。</p> <pre> <code class="language-java">public abstract class AsyncTask<Params, Progress, Result></code></pre> <p>AsyncTask 是一个抽象泛型类。三个类型 <em>Params</em> , <em>Progress</em> , <em>Result</em> 分别对应的是输入参数的类型,精度更新使用的类型,最后是返回结果的类型。其中任何一个类型如果你不需要的话,可以使用 <em>java.lang.Void</em> 代替。</p> <p>继承 AsyncTask 给出自己的实现,最少需要实现 doInBackground 方法。 doInBackground 方法是在后台线程中运行的。如果要在任务执行之后更新UI线程的话还至少需要给出 onPostExecute 方法的实现,在这个方法中才可以更新UI。</p> <p>上述的两个方法已经构成了一个 AsyncTask 使用的基本单元。在后台线程处理一些任务,并在处理完成之后更新UI。但是如果一个任务比较长,只是在最后更新UI是不够的,还需要不断的提示用户已经完成的进度是多少。这就是需要另外实现 onProgressUpdate 方法。并在 doInBackground 方法中调用 publishProgress 方法发出每个任务的处理进度。</p> <p>这个 AsyncTask 总体上就是这样的了:</p> <pre> <code class="language-java">inner class DemoAsyncTask() : AsyncTask<String, Int, String>() { // var isRunning = true override fun doInBackground(vararg params: String?): String? { Log.i(TAG, "##AsyncTask doing something...") var i = 0 val TOTAL = 100000000 var progress = 0 while (i < TOTAL) { Log.d(TAG, "doning jobs $i is cancelled $isCancelled") i++ var currentProgress = i.toFloat() / TOTAL if (currentProgress > progress && Math.abs(currentProgress - progress) > 0.1) { progress = currentProgress publishProgress((progress * 100).toInt()) } } } Log.d(TAG, "doing jobs $i is cancelled $isCancelled") return "Task done" } override fun onPostExecute(result: String?) { this@CancalableActivity.mAsyncTextView?.text = result } override fun onProgressUpdate(vararg values: Int?) { mAsyncTextView?.text = "${mAsyncTextView?.text ?: "Async task..."} progress: ${values?.get(0) ?: 0}" } }</code></pre> <p>到这里各位读者应该对 AsyncTask 已经有一个总体的认识了。后台任务在 doInBackground 处理,处理过程的百分比使用 publishProgress 方法通知,并在 onProgressUpdate 方法中更新UI的百分比。最后任务处理全部完成之后在 onPostExecute 更新UI,显示全部完成。</p> <p>怎么取消一个任务的执行呢?这个机本身还上面的线程的取消基本上一样。只是 AsyncTask 已经提供了足够的属性和方法完成取消的工作。直接调用 AsyncTask#cancel 方法就可以发出取消的信号,但是是否可以取消还要看这个方法的返回值是什么。如果是 <em>true</em> 那就是可以,否则任务不可取消(但是不可取消的原因很可能是任务已经执行完了)。</p> <p>调用 cancel 方法发出取消信号,并且可以取消的时候。 isCancelled() 就会返回 <em>true</em> 。同时 onPostExecute 这个方法就不会再被调用了。而是 onCancelled(object) 方法被调用。同样是在 doInBackground 这个方法执行完之后调用。所以,如果想要在取消任务执行后尽快的调用到 onCancelled(object) 的话,就需要在 onInBackground 的时候不断的检查 isCancelled() 是否返回 <em>true</em> 。如果返回的是 <em>true</em> 就跳出方法的执行。</p> <pre> <code class="language-java">inner class DemoAsyncTask() : AsyncTask<String, Int, String>() { // var isRunning = true override fun doInBackground(vararg params: String?): String? { Log.i(TAG, "##AsyncTask doing something...") var i = 0 val TOTAL = 1000000 var progress = 0.0f while (i < TOTAL && !isCancelled) { Log.d(TAG, "doning jobs $i is cancelled $isCancelled") i++ var currentProgress = i.toFloat() / TOTAL if (currentProgress > progress && Math.abs(currentProgress - progress) > 0.1) { progress = currentProgress publishProgress((progress * 100).toInt()) } } Log.d(TAG, "doning jobs $i is cancelled $isCancelled") return "Task done" } override fun onPostExecute(result: String?) { this@CancalableActivity.mAsyncTextView?.text = result } override fun onProgressUpdate(vararg values: Int?) { mAsyncTextView?.text = "${mAsyncTextView?.text ?: "Async task..."} progress: ${values?.get(0) ?: 0}" } override fun onCancelled() { Log.i(TAG, "##Task cancelled") // isRunning = false this@CancalableActivity.mAsyncTextView?.text = "###Task cancelled" } // override fun onCancelled(result: String?) { // Log.i(TAG, "##Task cancelled") //// isRunning = false // this@CancalableActivity.mAsyncTextView?.text = result ?: "Task cancelled" // } }</code></pre> <p>onCancelled() 是API level 3的时候加入的。 onCancelled(Result result) 是API level 11的时候加入的。这个在兼容低版本的时候需要注意。</p> <p>但是一点需要格外注意:</p> <pre> <code class="language-java">AsyncTask一定要在UI线程初始化。不过在**JELLY_BEAN**以后这个问题也解决了。 总之,在UI线程初始化你的`AsyncTask`肯定是不会错的。</code></pre> <h2><strong>线程池</strong></h2> <p>下面就来看看线程池的概念。顾名思义,线程池就是放线程的池子。把费时费力,或者影响响应用户操作的代码放在另外一个线程执行时常有的事。但是如果无顾忌的开辟线程,却会适得其反,严重的浪费系统资源。于是就有了线程池。线程池就是通过 某些机制让线程不要创建那么多,能复用就复用,实在不行就让任务排队等一等 。</p> <p>这个机制在线程池的构造函数里体现的非常明显:</p> <pre> <code class="language-java">public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)</code></pre> <ul> <li> <p>corePoolSize线程池里闲着也不回收的线程数量。除非 allowCoreThreadTimeOut 指定可以回收。</p> </li> <li> <p>maximumPoolSize线程池允许的最大线程数。</p> </li> <li> <p>keepAliveTime非核心线程(就是如果核心线程数量 corePoolSize 定义为1的话,第二个就是非核心线程)的超时时间。</p> </li> <li> <p>unit keepAliveTime 的时间单位,毫秒,秒等。</p> </li> <li> <p>workQueue存放 execute(Runnable cmd) 方法提交的 Runnable 任务。</p> </li> <li> <p>threadFactory线程池用来创建新线程用的一个工厂类。</p> </li> <li> <p>handler线程池达到最大线程数,并且任务队列也已经满的时候会拒绝 execute(Runnable cmd) 方法提交任务。这个时候调用这个handler。</p> </li> </ul> <p>知道以上基本内容以后,就可以探讨线程池管理线程的机制了。概括起来有三点:</p> <ol> <li> <p>如果线程池的线程数量少于 corePoolSize 的时候,线程池会使用 threadFactory 这个线程工厂创建新的线程执行 Runnable 任务。</p> </li> <li> <p>如果线程池的线程数量大于 corePoolSize 的时候,线程池会把 Runnable 任务存放在队列 workQueue 中。</p> </li> <li> <p>线程池的线程数量大于 corePoolSize ,队列 workQueue 已满,而且小于 maximumPoolSize 的时候,线程池会创建新的线程执行 Runnable 任务。否则,任务被拒。</p> </li> </ol> <p>现在回到 AsyncTask 。被人广为诟病的 AsyncTask 是他的任务都是顺序执行的。 <strong>一个AsyncTask的实例只能处理一个任务</strong> 。但是在 AsyncTask 后面处理任务的是一个静态的线程池。在看这个线程池 SerialExecutor 的 execute 方法实现:</p> <pre> <code class="language-java">final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>(); public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (mActive == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((mActive = mTasks.poll()) != null) { // 执行一个task } }</code></pre> <p>这个线程池 SerialExecutor 在处理 Runnable 的传入参数的时候对这个任务进行了重新包装成了一个新的 Runnable 对象,并且将这个新的对象存入了一个叫做 <em>mTasks</em> 的队列。这个新的 Runnable 对象首先执行传入的任务,之后不管有无异常调用 scheduleNext 方法执行下一个。于是整体的就生成了一个传入的任务都顺序执行的逻辑。</p> <p>这个线性执行的静态线程池 SerialExecutor 的实现非常简单。并不涉及到我们前文所说的那么多复杂的内容。在实现上,这个线程池只实现了线程池的最顶层接口 Executor 。这个接口只有一个方法就是 execute(Runnable r) 。另外需要强调一点: <em>mTasks</em> 的类型 ArrayDeque<T> 是一个不受大小限制的队列。可以存放任意多的任务。在线程池的讨论中遇到队列就需要看看容量概念。</p> <p>SerialExecutor 只是进行了简单的队列排列。但是在 scheduleNext 方法的实现上又会用到一个复杂一些的线程池来执行任务的具体执行。这线程池叫做</p> <p>THREAD_POOL_EXECUTOR 。我们来具体看看其实现:</p> <pre> <code class="language-java">private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = CPU_COUNT + 1; private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 1; public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);</code></pre> <p>这个线程池的实现非常具有现实价值。虽然稍后介绍的系统提供的几种线程池的实现就够用。但是难免遇到一些需要自定义线程池的情况。详细解析如下:</p> <ul> <li> <p>CORE_POOL_SIZE线程池的核心线程数量为设备核心数加一。</p> </li> <li> <p>MAXIMUM_POOL_SIZE线程池的最大线程数量为核心数的两倍加一。</p> </li> <li> <p>KEEP_ALIVE线程池中非核心线程的超时时间为一秒。</p> </li> <li> <p>sPoolWorkQueue线程池存放任务的队列。最大个数为 <em>128</em> 个。参考上面说的线程池处理机制,会出现任务被拒的情况。排队的线程池 SerialExecutor 存放任务的队列是可以认为无限长的,但是 THREAD_POOL_EXECUTOR 的队列最多存放128个任务,加上线程池核心线程的数量,能处理的任务相对有限。出现任务被拒的情况的几率比较大。所以,往 AsyncTask 里直接添加 Runnable 对象的时候需要三思。</p> </li> <li> <p>sThreadFactory线程池用来创建线程的工厂对象。 ThreadFactory 是一个只有一个方法 Thread newThread(Runnable r); 的接口。这里在实现的时候给新创建的线程添加了一个原子计数,并把这个计数作为线程名称传递给了线程的构造函数。</p> </li> </ul> <p>到这里,我们就已经很清楚 AsyncTask 是如何用一个极其简单的线程池 SerialExecutor 给任务排队的。又是如何使用一个复杂一些的线程池 THREAD_POOL_EXECUTOR 来处理具体的任务执行的。尤其是线程池 THREAD_POOL_EXECUTOR ,在我们实际应用一个自定义的线程池的时候在设定线程池核心线程数量,线程池最大线程数量的时候都依据什么?明显就是设备的CPU核心数。线程分别在不同个CPU核心中做并行的处理。核心数多可以同时处理的线程数就相对较多,相反则会比较少一些。如此设置核心线程数量就会平衡并行处理的任务数量和在处理的过程中耗费的系统资源。</p> <p>为了让开发者省时省力,系统默认的提供了四种可以适应不同应用条件的线程池:</p> <pre> <code class="language-java">public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } }</code></pre> <ul> <li> <p>newFixedThreadPool顾名思义,线程数量固定的线程池,且其数量等于参数指定值。这一类型的线程池的核心线程数量和最大线程数量是一样的。存放任务的队列的容量可以被认为无限大。一旦线程池创建的线程数量等 <em>nThreads</em> 参数值的时候,新增的任务将会被存放在任务队列中等待核心线程可用的时候执行。</p> </li> <li> <p>newSingleThreadExecutor newFixedThreadPool 的一个特殊情况,当 <em>mThreads</em> 值为1的时候。</p> </li> <li> <p>newCachedThreadPool这一类型的线程池中创建的线程都有60秒的超时时间,由于超时时间比较长等于是线程空闲了以后被缓存了60秒。由于核心线程数量为0,所以创建的线程都是非核心线程。也因此超时时间才管用。任务队列 SynchronousQueue 非常特殊,简单理解就是一个任务都存放不了。而线程池的最大线程数量又设定为 Integer.MAX_VALUE ,可以认为是无限大。根据线程池处理任务的机制,可以认为有新任务过来就会创建一个线程去处理这个任务,但是如果存在空闲没有超时的线程会优先使用。</p> </li> <li> <p>newScheduledThreadPool生成一个 ScheduledThreadPoolExecutor 实例。可以通过其提供的接口方法设定延迟一定的时间执行或者隔一定的时间周期执行。</p> </li> </ul> <p>来一个例子:</p> <pre> <code class="language-java">import static java.util.concurrent.TimeUnit.*; class BeeperControl { private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public void beepForAnHour() { final Runnable beeper = new Runnable() { public void run() { System.out.println("beep"); }; final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS); scheduler.schedule(new Runnable() { public void run() { beeperHandle.cancel(true); } }, 60 * 60, SECONDS); } }}</code></pre> <h2><strong>附录</strong></h2> <p>这里是上面例子中使用的全部代码。</p> <p>线程的停止:</p> <pre> <code class="language-java">package demo.retrofit2rxjavademo.Activities import android.os.Bundle import android.os.Handler import android.os.Message import android.support.v7.app.AppCompatActivity import android.widget.TextView import demo.retrofit2rxjavademo.R class CancalableActivity : AppCompatActivity() { lateinit var mResultTextView: TextView var mHandler = object : Handler() { override fun handleMessage(msg: Message?) { when (msg?.what) { THREAD_CANCELLED -> { mResultTextView.text = "thread cancelled" } THREAD_FINISHED -> { mResultTextView.text = "thread finished" } else -> { mResultTextView.text = "what's going on" } } } } var mThread: Thread? = null var runnable = object : CancelableRunnable() { override fun run() { if (isCancelled) { var msg = mHandler.obtainMessage(THREAD_CANCELLED) mHandler.sendMessage(msg) return } Thread.sleep(2000) if (isCancelled) { var msg = mHandler.obtainMessage(THREAD_CANCELLED) mHandler.sendMessage(msg) return } var msg = mHandler.obtainMessage(THREAD_FINISHED) mHandler.sendMessage(msg) } } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_cancalable) mResultTextView = findViewById(R.id.run_result_text_view) as TextView findViewById(R.id.start_button)?.setOnClickListener { v -> runnable.isCancelled = false mThread = Thread(runnable) mThread?.start() mResultTextView.text = "Thread running..." } findViewById(R.id.stop_button)?.setOnClickListener { v -> this.runnable.isCancelled = true } } abstract class CancelableRunnable() : Runnable { var isCancelled: Boolean = false } companion object { val THREAD_FINISHED = 0 val THREAD_CANCELLED = 1 } }</code></pre> <p> </p> <p>来自:https://segmentfault.com/a/1190000006880107</p> <p> </p>