[8] – Actor 与出新

Actor 是 Scala 基于消息传递的产出模型,虽然自 Scala-2.10
其默认并发模型的地位都为 Akka 取代,但这种与民俗
Java、C++完全不均等的面世模型依旧值得学习。

怎么下 Actor

扩展 Actor

先期来探视第一种植用法,下面是一个简便例子及有说明

//< 扩展超类 Actor
class ActorItem extends Actor {
  //< 重载 act 方法
  def act(): Unit = {
    //< receive 从消息队列 mailbox 中去一条消息并处理
    receive { case msg => println(msg) }
  }
}

object Test {
  def main (args: Array[String]): Unit = {
    val actorItem = new ActorItem
    //< 启动
    actorItem.start()

    //< 向 item 发送消息
    actorItem ! "actor test1"
    actorItem ! "actor test2"
  }
}

输出:
actor test1

这种用法在其实中并无常用,需要:

  1. 扩展超类 Actor
  2. 重载 act 方法
  3. 调用扩展类对象 start 方法

使用 scala.actors.Actor.actor 方法

老二栽方式是实在被常用并且是 Scala 社区推荐的,例子如下:

object Test {
  def main (args: Array[String]): Unit = {
    val actorItem = actor {
      receive { case msg => println(msg) }
    }

    //< 向 item 发送消息
    actorItem ! "actor test1"
    actorItem ! "actor test2"
  }
}

输出:
actor test1

此间要特别注意的凡,actor 其实是scala.actors.Actor的 actor
方法,并无是 scala 语言内建的。
这种利用方式更惠及,与第一种植扩大超类 Actor 有以下几点不同:

  1. 动 Actor.actor 方法(返回路为Actor)而无是扩张 Actor 并重载 act
    方法
  2. 结构就就起步,不需要调用 start方法(当然你调用了邪无会见生啊问题)

使用 react

除开可以下 receive 从信队列 mailbox 中取出消息并拍卖,react
同样好。receive 和 react
的分别与沟通将以下文中验证。先来看看怎么用,其实如果将点两截代码的
receive 替换成 react 即可:

class ActorItem extends Actor {
  def act(): Unit = {
    react { case msg => println(msg) }
  }
}

object Test {
  def main (args: Array[String]): Unit = {
    val actorItem = new ActorItem
    actorItem.start()

    actorItem ! "actor test1"
    actorItem ! "actor test2"
  }
}

输出:
actor test1

object Test {
  def main (args: Array[String]): Unit = {
    val actorItem = actor {
      react { case msg => println(msg) }
    }

    actorItem ! "actor test1"
    actorItem ! "actor test2"
  }
}

输出:
actor test1

连发处理消息

倘您细心考察,就见面发觉上面的每个示例中,都于 actor 发送了”actor
test1″和”actor test2″两漫长消息,但说到底只有打印了”actor
test1″这同修信息。这是坐,不管是 receive 还是 react,都止于 mailbox
中获取一长长的消息进行处理,处理终结后不见面再次获得一漫长处理。如果想如果不断从 maibox
中取消息并处理,也时有发生点儿种方式。

方式一:使用 loop。适用于扩充 Actor 和 actor 方法简单栽艺术

class ActorItem extends Actor {
  def act(): Unit = {
    loop {
      react { case msg => println(msg) }
    }
  }
}

object Test {
  def main (args: Array[String]): Unit = {
    val actorItem = new ActorItem
    actorItem.start()

    actorItem ! "actor test1"
    actorItem ! "actor test2"
  }
}

输出:
actor test1
actor test2

方式二:在 receive 处理着调用receive;在 react 处理面临调用
react。仅适用于 actor 方法这种方法

class ActorItem extends Actor {
  def act(): Unit = {
    react {
      case msg => {
        println(msg)
        act()
      }
    }
  }
}

object Test {
  def main (args: Array[String]): Unit = {
    val actorItem = new ActorItem
    actorItem.start()

    actorItem ! "actor test1"
    actorItem ! "actor test2"
  }
}

Actor是如何行事的

每个actor对象还出一个
mailbox,可以简简单单的当是一个行列,用来存放在发送给这actor的消息。
当 actor 发送信息时,它并无见面死,而当 actor
接收信息不时,它也不会见于从断。发送的信在吸纳 actor 的 mailbox
中等待处理,直到 actor 调用 receive 方法。

receive 具体是怎工作的啊?来瞧她的源码:

def receive[R](f: PartialFunction[Any, R]): R = {
  var done = false
  while (!done) {
    //< 从 mailbox 中取出一条消息
    val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
      senders = replyTo :: senders
      //< 与偏函数进行匹配,匹配失败返回 null
      val matches = f.isDefinedAt(m)
      senders = senders.tail
      matches
    })
    if (null eq qel) {
      //< 如果当前mailbox里面没有可以处理的消息,调用suspendActor,该方法会调用wait
      waitingFor = f.isDefinedAt  
      isSuspended = true 
      suspendActor()  
    } else {
      //< 执行到这里就说明成功从 mailbox 中获得匹配的消息
      received = Some(qel.msg)
      senders = qel.session :: senders
      done = true
    }
  }

  //< 成功获得消息后,调用 f.apply 来执行对应的操作
  val result = f(received.get)
  received = None
  senders = senders.tail
  result
}

同等图胜千言,下图为 receive 模型工作流程

actor_receive.jpg

和线程的关系

Actor 的线程模型可以这么敞亮:在一个历程遭到,所有的 actor
共享一个线程池,总的线程个数可以配备,也足以根据 CPU 个数控制。

当一个 actor 启动后,Scala 分配一个线程给它们采用,如果用 receive
模型,这个线程就径直也该 Actor 所有。
如果应用 react 模型,react 找到并拍卖消息继连无回来,它的回路为
Nothing,Scala 执行了 react 方法后,抛来大,调用 act 也不怕是间接调用
react 的线程会捕获这个那个,忘掉这个 actor,该线程就得给外actor
使用。
就此,如果能用 react 就尽可能用 react,可以省线程。

良好的 Actor 风格

单独透过信息及 actor 通信

选举个例子,一个 GoodActor可能会见在发朝 BadActor
的信息中含有一个对自己的援,来表明作为消息源的投机。如果 BadActor
调用了 GoodActor 的某任意的法要未是透过 “!”
发送信息之口舌,问题即使来了。被调用的法子可能读到 GoodActor
的个人实例数据,而这些数量也许是由其余一个线程写上。结果是,你要确保
BadActor 线程对这些实例数据的读取和 GoodActor
线程对这些数据的写入是同步在一个吊上之。一旦绕开了 actor
之间的消息传递机制,就返回了共享数据以及锁模型中。

优选不可变的音信

由 Scala 的 actor 模型提供了以每个 actor 的 act
方法被的单线程环境,不需操心在这方法的落实着以的靶子是不是是线程安全之。

确保信息对象是线程安全的超级途径是于信息遭运用不可变对象。任何只有 val
字段都这些字段只援引到不可变对象的好像的实例都是不可变的。

要您发现自己有一个可变的目标,想延续利用它们,同时也想用信息发送给其它一个
actor,此时应有考虑做并发送它的一个副本,比如以 clone 方法。

于信息于包含

通向某 actor 发送信息,如果你想获得这个 actor
的恢复,可以当信息备受寓我。示例如下:

class ActorItem extends Actor {
  def act(): Unit = {
    react {
      case (name: String, actor: Actor) => {
        println( name )
        actor ! "Done"
      }
    }
  }
}

object Test {
  def main (args: Array[String]): Unit = {
    val actorItem = new ActorItem
    actorItem.start()

    actorItem ! ("scala", self)

    receive {
      case msg => println( msg ) 
    }
  }
}

输出:
scala
Done

使样本类

于上例中,若将(name: String, actor: Actor)概念成类,代码可读性会大大提高

case class Info(name: String, actor: Actor)

class ActorItem extends Actor {
  def act(): Unit = {
    react {
      case Info => {
        println( Info.name )
        actor ! "Done"
      }
    }
  }
}

**传送门: **Scala
在简书目录


迎接关注自身之微信公众号:FunnyBigData

FunnyBigData