【Akka】在并发程序中使用Future

tu7648 9年前

引言

在Akka中, 一个 Future 是用来获取某个并发操作的结果的数据结构。这个操作通常是由Actor执行或由Dispatcher直接执行的. 这个结果可以以同步(阻塞)或异步(非阻塞)的方式访问。

Future提供了一种简单的方式来执行并行算法。

Future直接使用

Future中的一个常见用例是在不需要使用Actor的情况下并发地执行计算。Future有两种使用方式:

  1. 阻塞方式(Blocking):该方式下,父actor或主程序停止执行知道所有future完成各自任务。通过 scala.concurrent.Await 使用。
  2. 非阻塞方式(Non-Blocking),也称为回调方式(Callback):父actor或主程序在执行期间启动future,future任务和父actor并行执行,当每个future完成任务,将通知父actor。通过 onComplete 、 onSuccess 、 onFailure 方式使用。

执行上下文(ExecutionContext)

为了运行回调和操作,Futures需要有一个 ExecutionContext 。

如果你在作用域内有一个 ActorSystem ,它会它自己派发器用作ExecutionContext,你也可以用ExecutionContext伴生对象提供的工厂方法来将Executors和ExecutorServices进行包裹,或者甚至创建自己的实例。

通过导入 ExecutionContext.Implicits.global 来导入默认的全局执行上下文。你可以把该执行上下文看做是一个线程池,ExecutionContext是在某个线程池执行任务的抽象。

如果在代码中没有导入该执行上下文,代码将无法编译。

阻塞方式

第一个例子展示如何创建一个future,然后通过阻塞方式等待其计算结果。虽然阻塞方式不是一个很好的用法,但是可以说明问题。这个例子中,通过在未来某个时间计算1+1,当计算结果后再返回。

importscala.concurrent.{Await,Future}  importscala.concurrent.duration._  importscala.concurrent.ExecutionContext.Implicits.global    objectFutureBlockDemoextendsApp{   implicit val baseTime = System.currentTimeMillis     // create a Future   val f = Future{  Thread.sleep(500)  1+1   }   // this isblocking(blockingisbad)   val result=Await.result(f,1second)   // 如果Future没有在Await规定的时间里返回,   // 将抛出java.util.concurrent.TimeoutException   println(result)  Thread.sleep(1000)  }

代码解释:

  1. 在上面的代码中,被传递给Future的代码块会被缺省的 Dispatcher 所执行,代码块的返回结果会被用来完成 Future 。 与从Actor返回的Future不同,这个Future拥有正确的类型, 我们还避免了管理Actor的开销。
  2. Await.result 方法将阻塞1秒时间来等待Future结果返回,如果Future在规定时间内没有返回,将抛出 java.util.concurrent.TimeoutException 异常。
  3. 通过导入 scala.concurrent.duration._ ,可以用一种方便的方式来声明时间间隔,如 100 nanos , 500 millis , 5 seconds 、 1 minute 、 1 hour , 3 days 。还可以通过 Duration(100, MILLISECONDS) , Duration(200, "millis") 来创建时间间隔。

非阻塞方式(回调方式)

有时你只需要监听 Future 的完成事件,对其进行响应,不是创建新的Future,而仅仅是产生副作用。

通过 onComplete , onSuccess , onFailure 三个回调函数来异步执行Future任务,而后两者仅仅是第一项的特例。

使用onComplete的代码示例:

importscala.concurrent.{Future}  importscala.concurrent.ExecutionContext.Implicits.global  importscala.util.{Failure, Success}  importscala.util.Random    objectFutureNotBlockextendsApp{   println("starting calculation ...")  valf = Future {   Thread.sleep(Random.nextInt(500))  42   }     println("before onComplete")   f.onComplete{  caseSuccess(value) => println(s"Got the callback, meaning = $value")  caseFailure(e) => e.printStackTrace   }    // do the rest of your work   println("A ...")   Thread.sleep(100)   println("B ....")   Thread.sleep(100)   println("C ....")   Thread.sleep(100)   println("D ....")   Thread.sleep(100)   println("E ....")   Thread.sleep(100)     Thread.sleep(2000)  }

使用onSuccess、onFailure的代码示例:

importscala.concurrent.{Future}  importscala.concurrent.ExecutionContext.Implicits.global  importscala.util.{Failure, Success}  importscala.util.Random    objectTest12_FutureOnSuccessAndFailureextendsApp{  valf = Future {   Thread.sleep(Random.nextInt(500))  if(Random.nextInt(500) >250)thrownewException("Tikes!")else42   }     f onSuccess {  caseresult => println(s"Success: $result")   }     f onFailure {  caset => println(s"Exception: ${t.getMessage}")   }    // do the rest of your work   println("A ...")   Thread.sleep(100)   println("B ....")   Thread.sleep(100)   println("C ....")   Thread.sleep(100)   println("D ....")   Thread.sleep(100)   println("E ....")   Thread.sleep(100)     Thread.sleep(1000)  }

代码解释:

上面两段例子中,Future结构中随机延迟一段时间,然后返回结果或者抛出异常。

然后在回调函数中进行相关处理。

创建返回Future[T]的方法

先看一下示例:

importscala.concurrent.{Await,Future, future}  importscala.concurrent.ExecutionContext.Implicits.global  importscala.util.{Failure,Success}    objectReturnFutureextendsApp{   implicit val baseTime = System.currentTimeMillis     // `future` methodisanother way to create a future   // Itstarts the computation asynchronouslyandretures aFuture[Int] that   // will hold the resultofthe computation.   def longRunningComputation(i: Int):Future[Int] = future {  Thread.sleep(100)   i + 1   }     // this does notblock   longRunningComputation(11).onComplete {  caseSuccess(result) => println(s"result = $result")  caseFailure(e) => e.printStackTrace   }     // keep the jvm fromshutting down  Thread.sleep(1000)  }

代码解释:

上面代码中的longRunningComputation返回一个 Future[Int] ,然后进行相关的异步操作。

其中 future 方法是创建一个future的另一种方法。它将启动一个异步计算并且返回包含计算结果的 Future[T] 。

Future用于Actor

通常有两种方法来从一个Actor获取回应: 第一种是发送一个消息 actor ! msg ,这种方法只在发送者是一个Actor时有效;第二种是通过一个Future。

使用Actor的 ? 方法来发送消息会返回一个Future。 要等待并获取结果的最简单方法是:

importscala.concurrent.Await  importakka.pattern.ask  importscala.concurrent.duration._  importakka.util.Timeout    implicit val timeout = Timeout(5seconds)  val future = actor ? msg  val result=Await.result(future, timeout.duration).asInstanceOf[String]

下面是使用 ? 发送消息给actor,并等待回应的代码示例:

importakka.actor._  importakka.pattern.ask  importakka.util.Timeout  importscala.concurrent.{Await, Future}  importscala.language.postfixOps  importscala.concurrent.duration._    caseobjectAskNameMessage    classTestActorextendsActor{  defreceive = {  caseAskNameMessage =>// respond to the 'ask' request   sender ! "Fred"  case_ => println("that was unexpected")   }  }  objectAskDemoextendsApp{  //create the system and actor  valsystem = ActorSystem("AskDemoSystem")  valmyActor = system.actorOf(Props[TestActor], name="myActor")    // (1) this is one way to "ask" another actor for information   implicit valtimeout = Timeout(5seconds)  valfuture = myActor ? AskNameMessage  valresult = Await.result(future, timeout.duration).asInstanceOf[String]   println(result)    // (2) a slightly different way to ask another actor for information  valfuture2: Future[String] = ask(myActor, AskNameMessage).mapTo[String]  valresult2 = Await.result(future2,1second)   println(result2)     system.shutdown  }

代码解释:

  1. Await.result(future, timeout.duration).asInstanceOf[String] 会导致当前线程被阻塞,并等待actor通过它的应答来完成 Future 。但是阻塞会导致性能问题,所以是不推荐的。致阻塞的操作位于 Await.result 和 Await.ready 中,这样就方便定位阻塞的位置。
  2. 还要注意actor返回的Future的类型是 Future[Any] ,这是因为actor是动态的。 这也是为什么上例中注释(1)使用了 asInstanceOf 。
  3. 在使用非阻塞方式时,最好使用 mapTo 方法来将Future转换到期望的类型。如果转换成功, mapTo 方法会返回一个包含结果的新的 Future,如果不成功,则返回 ClassCastException 异常。
</div>

来自: http://jasonding1354.github.io/2016/01/19/Scala/【Akka】在并发程序中使用Future/