[8] – Actor 与并发

澳门新葡亰网站注册 2

本 系列 中以前的文章介绍了如何通过以下方式实现并发性:

Actor 是 Scala 基于消息传递的并发模型,虽然自 Scala-2.10
其默认并发模型的地位已被 Akka 取代,但这种与传统
Java、C++完全不一样的并发模型依旧值得学习。

  • 并行地在多个数据集上执行相同的操作(就像 Java 8 流一样)
  • 显式地将计算构建成异步执行某些操作,然后将结果组合在一起(就像
    future 一样)。

扩展 Actor

先来看看第一种用法,下面是一个简单例子及部分说明

//< 扩展超类 Actorclass ActorItem extends Actor { //< 重载 act 方法 def act(): Unit = { //< receive 从消息队列 mailbox 中去一条消息并处理 receive { case msg => println } }}object Test { def main (args: Array[String]): Unit = { val actorItem = new ActorItem //< 启动 actorItem.start() //< 向 item 发送消息 actorItem ! "actor test1" actorItem ! "actor test2" }}

输出:actor test1

这种用法在实际中并不常用,需要:

  1. 扩展超类 Actor
  2. 重载 act 方法
  3. 调用扩展类对象 start 方法

这两种方法都是实现并发性的不错方式,但是您必须将它们明确地设计到应用程序中。

使用 scala.actors.Actor.actor 方法

第二种方式是实际中常用并且是 Scala 社区推荐的,例子如下:

object Test { def main (args: Array[String]): Unit = { val actorItem = actor { receive { case msg => println } } //< 向 item 发送消息 actorItem ! "actor test1" actorItem ! "actor test2" }}

输出:actor test1

这里需要特别注意的是,actor 其实是scala.actors.Actor的 actor
方法,并不是 scala 语言内建的。这种使用方法更加方便,与第一种扩展超类
Actor 有以下几点不同:

  1. 使用 Actor.actor 方法(返回类型为Actor)而不是扩展 Actor 并重载 act
    方法
  2. 构造完成即启动,不需要调用 start方法(当然你调用了也不会有什么问题)

在本文和接下来的几篇文章中,我将着重介绍一种不同的并发性实现方法,该方法基于一种特定的程序结构,与显式编码方法不同。这种程序结构就是 actor
模型
。您将了解如何使用 actor
模型的 Akka 实现。(Akka 是一个构建并发和分布式 JVM
应用程序的工具包和运行时。)请参阅 参考资料,获取本文完整示例代码的链接。

使用 react

除了可以使用 receive 从消息队列 mailbox 中取出消息并处理,react
同样可以。receive 和 react
的区别与联系将在下文中说明。先来看看怎么用,其实只要把上面两段代码的
receive 替换成 react 即可:

class ActorItem extends Actor { def act(): Unit = { react { case msg => println } }}object Test { def main (args: Array[String]): Unit = { val actorItem = new ActorItem actorItem.start() actorItem ! "actor test1" actorItem ! "actor test2" }}

输出:actor test1

object Test { def main (args: Array[String]): Unit = { val actorItem = actor { react { case msg => println } } actorItem ! "actor test1" actorItem ! "actor test2" }}

输出:actor test1

Actor 模型基础知识

用于并发计算的 actor 模型基于各种称为 actor 的原语来构建系统。Actor
执行操作来响应称为消息 的输入。这些操作包括更改 actor
自己的内部状态,以及发出其他消息和创建其他
actor。所有消息都是异步交付的,因此将消息发送方与接收方分开。正是由于这种分离,导致
actor 系统具有内在的并发性:可以不受限制地并行执行任何拥有输入消息的
actor。

在 Akka 术语中,actor
看起来就像是某种通过消息进行交互的行为神经束。像真实世界的演员一样,Akka
actor 也需要一定程度的隐私。您不能直接将消息发送给 Akka
actor。相反,需要将消息发送给等同于邮政信箱的
actor 引用。然后通过该引用将传入的消息路由到 actor
的邮箱,以后再传送给 actor。Akka actor
甚至要求所有传入的消息都是无菌的(或者在 JVM
术语中叫做不可变的),以免受到其他 actor 的污染。

与一些真实世界中演员的需求不同,Akka
中由于某种原因而存在一些看似强制要求的限制。使用 actor
的引用可阻止交换消息以外的任何交互,这些交互可能破坏 actor
模型核心上的解耦本质。Actor 在执行上是单线程的(不超过 1
个线程执行一个特定的 actor
实例),所以邮箱充当着一个缓冲器,在处理消息前会一直保存这些消息。消息的不可变性(由于
JVM 的限制,目前未由 Akka
强制执行,但这是一项既定的要求)意味着根本无需担心可能影响 actor
之间各种共享的数据的同步问题;如果只有共享的数据是不可变的,那么根本不需要同步。

持续处理消息

如果你仔细观察,就会发现上面的每个示例中,都向 actor 发送了”actor
test1″和”actor test2″两条消息,但最终只打印了”actor
test1″这一条消息。这是因为,不管是 receive 还是 react,都只从 mailbox
中取一条消息进行处理,处理完之后不会再取一条处理。如果想要持续从 maibox
中取消息并处理,也有两种方式。

方式一:使用 loop。适用于扩展 Actor 和 actor 方法两种方式

class ActorItem extends Actor { def act(): Unit = { loop { react { case msg => println } } }}object Test { def main (args: Array[String]): Unit = { val actorItem = new ActorItem actorItem.start() actorItem ! "actor test1" actorItem ! "actor test2" }}

输出:actor test1actor test2

方式二:在 receive 处理中调用receive;在 react 处理中调用
react。仅适用于 actor 方法这种方法

class ActorItem extends Actor { def act(): Unit = { react { case msg => { println act() } } }}object Test { def main (args: Array[String]): Unit = { val actorItem = new ActorItem actorItem.start() actorItem ! "actor test1" actorItem ! "actor test2" }}

每个actor对象都有一个
mailbox,可以简单的认为是一个队列,用来存放发送给这个actor的消息。当
actor 发送消息时,它并不会阻塞,而当 actor
接收消息时,它也不会被打断。发送的消息在接收 actor 的 mailbox
中等待处理,直到 actor 调用 receive 方法。

receive 具体是怎么工作的呢?来看看它的源码:

def receive[R](f: PartialFunction[Any, R]): R = { var done = false while  { //< 从 mailbox 中取出一条消息 val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { senders = replyTo :: senders //< 与偏函数进行匹配,匹配失败返回 null val matches = f.isDefinedAt senders = senders.tail matches }) if (null eq qel) { //< 如果当前mailbox里面没有可以处理的消息,调用suspendActor,该方法会调用wait waitingFor = f.isDefinedAt isSuspended = true suspendActor() } else { //< 执行到这里就说明成功从 mailbox 中获得匹配的消息 received = Some senders = qel.session :: senders done = true } } //< 成功获得消息后,调用 f.apply 来执行对应的操作 val result = f(received.get) received = None senders = senders.tail result}

一图胜千言,下图为 receive 模型工作流程

澳门新葡亰网站注册 1actor_receive.jpg

开始实现

现在您已大体了解了 actor 模型和 Akka 细节,是时候看些代码了。使用 hello
作为编码示例司空见惯,但它确实能够帮助用户快速、轻松地理解一种语言或系统。清单
1 显示了 Scala 中的一个 Akka 版本。

与线程的关系

Actor 的线程模型可以这样理解:在一个进程中,所有的 actor
共享一个线程池,总的线程个数可以配置,也可以根据 CPU 个数决定。

当一个 actor 启动后,Scala 分配一个线程给它使用,如果使用 receive
模型,这个线程就一直为该 Actor 所有。如果使用 react 模型,react
找到并处理消息后并不返回,它的返回类型为 Nothing,Scala 执行完 react
方法后,抛出异常,调用 act 也就是间接调用 react
的线程会捕获这个异常,忘掉这个 actor,该线程就可以被其他actor
使用。所以,如果能用 react 就尽量使用 react,可以节省线程。

清单 1. 简单的 Scala hello
import akka.actor._
import akka.util._

/** Simple hello from an actor in Scala. */
object Hello1 extends App {

  val system = ActorSystem("actor-demo-scala")
  val hello = system.actorOf(Props[Hello])
  hello ! "Bob"
  Thread sleep 1000
  system shutdown

  class Hello extends Actor {
    def receive = {
      case name: String => println(s"Hello $name")
    }
  }
}

清单 1
中的代码分为两个单独的代码段,它们都包含在 Hello1 应用程序项目中。第一个代码段是
Akka 应用程序基础架构,它:

  1. 创建一个 actor 系统(ActorSystem(...) 行)。
  2. 在系统内创建一个 actor(system.actorOf(...) 行,它为所创建的 actor
    返回一个 actor 引用)。
  3. 使用 actor 引用向 actor 发送消息(hello !"Bob" 行)。
  4. 等待一秒钟,然后关闭 actor 系统(system shutdown 行)。

system.actorOf(Props[Hello]) 调用是创建 actor
实例的推荐方式,它使用了专门用于 Hello actor
类型的配置属性。对于这个简单的
actor(扮演一个小角色,只有一句台词),没有配置信息,所以 Props 对象没有参数。如果想在您的
actor 上设置某种配置,可专门为该 actor
定义一个其中包含了所有必要信息的 Props 类。(后面的示例会展示如何操作。)

hello !"Bob" 语句将一条消息(在本例中为字符串 Bob)发送给已创建的
actor。! 运算符是 Akka 中表示将一条消息发送到 actor
的便捷方式,采用了触发即忘的模式。如果不喜欢这种特殊的运算符风格,可使用 tell() 方法实现相同的功能。

第二段代码是 Hello actor
定义,以 class Hello extends Actor 开头。这个特定的 actor
定义非常简单。它定义必需的(对于所有
actor)局部函数 receive,该函数实现了传入消息的处理方式。(receive 是一个局部函数澳门新葡亰网站注册,,因为仅为一些输入定义了它
— 在本例中,仅为 String 消息输入定义了该函数。)为这个 actor
所实现的处理方法是,只要收到一条 String 消息,就使用该消息值打印一条问候语。

只通过消息与 actor 通信

举个例子,一个 GoodActor可能会在发往 BadActor
的消息中包含一个指向自己的引用,来表明作为消息源的自己。如果 BadActor
调用了 GoodActor 的某个任意的方法而不是通过 “!”
发送消息的话,问题就来了。被调用的方法可能读到 GoodActor
的私有实例数据,而这些数据可能是由另一个线程写进去。结果是,你需要确保
BadActor 线程对这些实例数据的读取和 GoodActor
线程对这些数据的写入是同步在一个锁上的。一旦绕开了 actor
之间的消息传递机制,就回到了共享数据和锁模型中。

Java 中的 Hello

清单 2 给出了清单 1 中的 Akka Hello 在普通 Java 中的表示。

优选不可变的消息

由于 Scala 的 actor 模型提供了在每个 actor 的 act
方法中的单线程环境,不需要担心在这个方法的实现中使用的对象是否是线程安全的。

确保消息对象是线程安全的最佳途径是在消息中使用不可变对象。任何只有 val
字段且这些字段只引用到不可变对象的类的实例都是不可变的。

如果你发现自己有一个可变的对象,想继续使用它,同时也想用消息发送给另一个
actor,此时应该考虑制作并发送它的一个副本,比如利用 clone 方法。

清单 2. Java 中的 Hello
import akka.actor.*;

public class Hello1 {

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("actor-demo-java");
        ActorRef hello = system.actorOf(Props.create(Hello.class));
        hello.tell("Bob", ActorRef.noSender());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) { /* ignore */ }
        system.shutdown();
    }

    private static class Hello extends UntypedActor {

        public void onReceive(Object message) throws Exception {
            if (message instanceof String) {
                System.out.println("Hello " + message);
            }
        }
    }
}

清单 3 显示了来自包含 lambda 的 Java 8 的 actor 定义,以及 lambda
支持的 ReceiveBuilder 类所需要的导入。清单 3 或许更加紧凑,但与清单 2
大同小异。

让消息自包含

向某个 actor 发送消息,如果你想得到这个 actor
的回复,可以在消息中包含自身。示例如下:

class ActorItem extends Actor { def act(): Unit = { react { case (name: String, actor: Actor) => { println actor ! "Done" } } }}object Test { def main (args: Array[String]): Unit = { val actorItem = new ActorItem actorItem.start() actorItem ! ("scala", self) receive { case msg => println } }}

输出:scalaDone
清单 3. Java 8 的 Akka Hello 版本
import akka.japi.pf.ReceiveBuilder;
...
    private static class Hello extends AbstractActor {

        public Hello() {
            receive(ReceiveBuilder.
                match(String.class, s -> { System.out.println("Hello " + s); }).
                build());
        }
    }

与清单 2 相比,清单 3 中的 Java 8 代码使用了一个不同的基类
AbstractActor 代替 UntypedActor
而且还使用了一种不同的方式来定义消息处理方案。ReceiveBuilder 类允许您使用
lambda 表达式来定义消息的处理方式,并采用了类似 Scala
的匹配语法。如果您主要在 Scala 中进行开发工作,此技术可能有助于让 Java
Akka 代码看起来更简洁,但使用 Java 8 特定版本的好处似乎有些微不足道。

使用样本类

在上例中,若把(name: String, actor: Actor)定义成类,代码可读性会大大提高

case class Info(name: String, actor: Actor)class ActorItem extends Actor { def act(): Unit = { react { case Info => { println( Info.name ) actor ! "Done" } } }}

**传送门: **Scala 在简书目录

欢迎关注我的微信公众号:FunnyBigData

澳门新葡亰网站注册 2FunnyBigData

为什么还要等待?

在主应用程序代码中,将消息发送到 actor
之后,会有一次 Thread sleep 1000 形式的等待,然后才会关闭系统。您可能想知道为什么需要等待。毕竟,消息很容易处理;难道消息没有立即传到
actor,在 hello !"Bob" 语句完成时还在处理当中?

这个问题的答案很简单:“不是”。Akka actor 是异步运行的,所以即使目标
actor 与发送方 actor 位于相同的 JVM 中,目标 actor
也绝不会立即开始执行。相反,处理该消息的线程会将消息添加到目标 actor
的邮箱中。将消息添加到邮箱中会触发一个线程,以便从邮箱获取该消息并调用
actor
的 receive 方法来处理。但从邮箱获取消息的线程通常不同于将消息添加到邮箱的线程。

消息传送时间和保证

“为什么还要等待?” 这一问题的简短答案的背后是一种更深入的原理。Akka 支持
actor
远程通信且具有位置透明性,意味着您的代码没有任何直接的方式来了解一个特定的
actor 是位于同一 JVM
中,还是在系统外的云中某处运行。但这两种情况在实际操作中显然具有完全不同的特征。

“Akka 无法保证消息将被传送到目的地。这种无保证传送背后的哲学原理是
Akka 的核心原理之一。 ”

一个差别与消息丢失有关。Akka
无法保证消息将被传送到目的地,熟悉消息传递系统(用于连接应用程序)的开发人员可能对此很吃惊。这种无保证传送背后的哲学原理是
Akka
的核心原理之一:针对失败而设计。作为一种有意为之的过度简化,可以认为传送保证为消息传输系统添加了很高的复杂性,而且这些更复杂的系统有时无法按预期运行,而应用程序代码还必须涉及恢复操作。这种原理在应用程序代码始终自行处理传送失败情况时很有意义,能够让消息传送系统保持简单。

Akka 可以 保证消息最多传送一次,而且绝不会无序地收到从一个 actor
实例发送到另一个 actor 实例的消息。但后者仅适用于特定的 actor
对,二者没有联系。如果 actor A 将消息发送给 actor
B,这些消息绝不会被打乱顺序。如果 actor A 将消息发送给 actor
C,情况也是如此。但是,如果 actor B 也将消息发送给 actor C(例如将来自 A
的消息转发给 C),B 的消息相对于来自 A 的消息而言可能是乱序的。

在 清单
1 的代码中,消息丢失的概率非常低,因为代码在单个
JVM
中运行,不会生成过多的消息负载。(过多的消息负载可能导致消息丢失。如果
Akka 没有空间来存储消息,例如它没有备用方案,那么只能丢弃消息。)但清单
1 代码的结构仍未对消息传送事件做出任何假设,而且允许 actor
系统执行异步操作。

Actor 和状态

Akka 的 actor 模型很灵活,支持所有类型的 actor。可以使用没有状态信息的
actor(就像 Hello1 示例中一样),但这些 actor
可能等效于方法调用。添加状态信息可实现更为灵活的 actor 函数。

清单
1 提供了一个完整的(但很普通)actor
系统示例 — 但拥有一个始终执行同一工作的
actor。每位演员都讨厌反复重复同一句话,所以清单 4 向 actor
添加了一些状态信息,使工作变得更有趣。

清单 4. Polyglot Scala hello
object Hello2 extends App {

  case class Greeting(greet: String)
  case class Greet(name: String)

  val system = ActorSystem("actor-demo-scala")
  val hello = system.actorOf(Props[Hello], "hello")
  hello ! Greeting("Hello")
  hello ! Greet("Bob")
  hello ! Greet("Alice")
  hello ! Greeting("Hola")
  hello ! Greet("Alice")
  hello ! Greet("Bob")
  Thread sleep 1000
  system shutdown

  class Hello extends Actor {
    var greeting = ""
    def receive = {
      case Greeting(greet) => greeting = greet
      case Greet(name) => println(s"$greeting $name")
    }
  }
}

清单 4 中的 actor
知道如何处理两种不同类型的消息,这些消息在清单的开头附近定义:Greeting 消息和 Greet 消息,每条消息都包装了一个字符串值。修改后的 Hello actor
收到 Greeting 消息时,会将所包装的字符串保存为 greeting 值。收到 Greet 消息时,则将已保存的
greeting
值与 Greet 字符串组合起来,形成最终的消息。下面在运行此应用程序时,我们可以看到在控制台中打印出的消息(但消息不一定是按此顺序出现的,因为
actor 执行顺序是不确定的):

Hello Bob
Hello Alice
Hola Alice
Hola Bob

清单 4 中并没有太多的新代码,所以我没有提供其 Java
版本。您可在代码下载内容中找到它们(参见 参考资料),名为com.sosnoski.concur.article5java.Hello2 和 com.sosnoski.concur.article5java8.Hello2