scala 中的并行计算模型,scala 中的一切皆 actor
<aside> 💡 这是一种过时的写法,可以使用 akka-actor 替代
</aside>
actor 本质是一个对象,它有如下功能
actor 的任务调度有两种方式,基于线程
和基于事件
基于线程调度的方式,当 mailbox 中没有数据时会休眠等待
基于事件触发机制,当有数据来时,触发处理
import scala.actors.Actor
class FirstActor extends Actor {
// 类似 run 方法
override def act() : Unit = {
while (true) {
receive{
case "start" =>
println("starting")
Thread.sleep(1000)
println("started")
case "stop" =>
println("stopping")
Thread.sleep(1000)
println("stopped")
}
}
}
}
// 使用 react 可以更高效,实现线程的重复利用,类似 java 中的线程池
class SecondActor extends Actor {
// 类似 run 方法
override def act() : Unit = {
loop {
react {
case "start" =>
println("starting")
Thread.sleep(1000)
println("started")
case "stop" =>
println("stopping")
Thread.sleep(1000)
println("stopped")
case AsyncMsg(id, msg) =>
println(s"$id + $msg")
Thread.sleep(1000)
sender ! ReplyMsg(id, "Async Finished") // 给发送者返回消息
case SyncMsg(id, msg) =>
println(s"$id + $msg")
Thread.sleep(1000)
sender ! ReplyMsg(id, "Sync Finished") // 给发送者返回消息
}
}
}
}
case class SyncMsg(id: Int, msg: String)
case class AsyncMsg(id: Int, msg: String)
case class ReplyMsg(id: Int, msg: String)
object ActorTest {
def main(args: Array[String]): Unit = {
// 构建 actor 对象
val actor = new SecondActor
// 开启一个线程启动actor
actor.start()
// 给 actor 发送消息
// ! 发送异步消息,没有返回值
// !? 发送同步消息,等待返回值
// !! 发送异步消息,有返回值 Future[Any]
actor ! "start"
actor ! "stop"
val reply1 = actor ! AsyncMsg(1, "Hello actor")
println(reply1) // 没有返回值, 会立即打印 ()
val reply2 = actor !? SyncMsg(2, "Hello actor")
println(reply2) // 会阻塞,打印返回的内容
val reply3 = actor !! AsyncMsg(3, "hello actor")
println(reply3) // 是一个 Future
println(reply3.isSet) // 检测返回结果是否可用
println(reply3()) // reply3() 阻塞等待结果返回
}
}