scala 中的并行计算模型,scala 中的一切皆 actor

<aside> 💡 这是一种过时的写法,可以使用 akka-actor 替代

</aside>

模型

actor 本质是一个对象,它有如下功能

  1. 接受消息,存放在自己维护的一个 mailbox (本质是一个 FIFO 的 Seq)
  2. 处理消息
    1. 创建新的 actor
    2. 指定下一个消息的处理
    3. 给其他 actor 发送消息

和 java 多线程模型的区别

任务调度

actor 的任务调度有两种方式,基于线程基于事件

基于线程

基于线程调度的方式,当 mailbox 中没有数据时会休眠等待

基于事件

基于事件触发机制,当有数据来时,触发处理

代码案例

import scala.actors.Actor

class FirstActor extends Actor {
  // 类似 run 方法
  override def act() : Unit = {
    while (true) {
      receive{
        case "start" =>
          println("starting")
          Thread.sleep(1000)
          println("started")
        case "stop" =>
          println("stopping")
          Thread.sleep(1000)
          println("stopped")
      }
    }
  }
}

// 使用 react 可以更高效,实现线程的重复利用,类似 java 中的线程池
class SecondActor extends Actor {
  // 类似 run 方法
  override def act() : Unit = {
    loop {
      react {
        case "start" =>
          println("starting")
          Thread.sleep(1000)
          println("started")
        case "stop" =>
          println("stopping")
          Thread.sleep(1000)
          println("stopped")
        case AsyncMsg(id, msg) =>
          println(s"$id + $msg")
          Thread.sleep(1000)
          sender ! ReplyMsg(id, "Async Finished")  // 给发送者返回消息
        case SyncMsg(id, msg) =>
          println(s"$id + $msg")
          Thread.sleep(1000)
          sender ! ReplyMsg(id, "Sync Finished")  // 给发送者返回消息
      }
    }
  }
}

case class SyncMsg(id: Int, msg: String)
case class AsyncMsg(id: Int, msg: String)
case class ReplyMsg(id: Int, msg: String)

object ActorTest {

  def main(args: Array[String]): Unit = {
    // 构建 actor 对象
    val actor = new SecondActor

    // 开启一个线程启动actor
    actor.start()

    // 给 actor 发送消息
    // ! 发送异步消息,没有返回值
    // !? 发送同步消息,等待返回值
    // !! 发送异步消息,有返回值 Future[Any]
    actor ! "start"
    actor ! "stop"

    val reply1 = actor ! AsyncMsg(1, "Hello actor")
    println(reply1)  // 没有返回值, 会立即打印 ()

    val reply2 = actor !? SyncMsg(2, "Hello actor")
    println(reply2)  // 会阻塞,打印返回的内容

    val reply3 = actor !! AsyncMsg(3, "hello actor")
    println(reply3)  // 是一个 Future
    println(reply3.isSet)  // 检测返回结果是否可用
    println(reply3())  // reply3() 阻塞等待结果返回
  }
}