Skip to main content

Actor Model

·297 words·2 mins
WFUing
Author
WFUing
A graduate who loves coding.
Table of Contents

CPU 上有多个内核。如果我们想充分利用现有的这些硬件,就需要一种并发运行代码的方法。数十年来无法追踪的错误和开发人员的沮丧都表明,线程并不是解决问题的办法。不过不用担心,我们还有其他很好的选择,今天我要向你展示的就是其中之一:actor model。

actor model
#

actor model 是一种处理并发计算的概念模型。它为系统组件的行为和交互方式定义了一些通用规则。

actors
#

actor 是计算的原始单元。它接收 message,并根据 message进行某种计算。

这种想法与面向对象语言(object-oriented languages)中的想法非常相似:对象接收 message(方法调用),并根据接收到的 message(我们调用的方法)进行操作。

主要区别在于,actors 之间是完全隔离的,它们永远不会共享内存。值得注意的是,一个 actor 可以保持一个私有状态,其他 actor 永远无法直接改变该状态。

一个 actor 不是 actor。它们是以系统的形式出现的。在 actor model 中,一切都是 actor,它们需要有地址,这样一个行为者才能向另一个 actor 发送 message。

mailbox
#

虽然多个 actor 可以同时运行,但一个 actor 会按顺序处理给定的 message。这意味着,如果你向同一个 actor 发送 3 条 message,它只会一次执行一条。要同时执行这 3 条 message,你需要创建 3 个 actor,每个 actor 发送一条 message。

message 是异步发送给角色的,角色在处理另一条消息时需要将消息存储在某个地方。mailbox 就是存储这些 message 的地方。

mailbox

actor 之间通过发送异步消息进行通信。这些 message 会保存在其他 actor 的 mailbox 中,直到它们被处理。

What actors do
#

当 actor 收到 message 时,它可以做以下三件事中的一件:

  • Create more actors
  • Send messages to other actors
  • Designate what to do with the next message:指定义这个状态在收到下一条信息时的样子,行为体如何改变状态。假设我们有一个行为类似于计算器的行为体,它的初始状态是简单的数字 0。当这个行为体收到 add(1) 消息时,它不会改变自己的原始状态,而是指定在收到下一条消息时,状态将是 1。

Fault tolerance
#

Erlang 引入了 “let it crash” 的理念。其理念是,你不需要进行防御性编程,试图预测所有可能发生的问题,并找到处理它们的方法,因为根本不可能考虑到每一个故障点。

Erlang 所做的就是简单地让它崩溃,但让这些关键代码由某个人监管,而这个人唯一的责任就是知道当崩溃发生时该做什么(比如将代码单元重置为稳定状态),而使这一切成为可能的就是 actor model。

每段代码都运行在一个进程中(这也是 Erlang 对其角色的基本称呼)。这个进程是完全孤立的,这意味着它的状态不会影响任何其他进程。我们有一个 “监督者”,它基本上是另一个进程(所有东西都是行为体,还记得吗?),当被监督的进程崩溃时,它会收到通知,然后可以采取一些措施。

这就使得创建 “self heal” 系统成为可能,也就是说,如果一个行为体由于某种原因进入了异常状态并崩溃,那么监管者就可以采取一些措施,尝试将其恢复到一致的状态(有多种策略可以做到这一点,最常见的就是以初始状态重新启动行为体)。

Actor Model For IoT
#

物联网(IoT)由许多节点组成,通常功能有限。通过互联网协议标准进行通信的小型软件组件通常在机器之间形成高度分布式的工作流程,人与机器之间的互动极少。一般的应用场景包括监控环境条件等数据的传感器。复杂的应用则使用传感器和执行器,例如:家庭自动化和健康数据跟踪。这些系统使机器能够将数据上传到互联网服务器。因此,它们可以随时随地跟踪数据。

典型 IoT 系统的主要特点之一是涉及大量受管设备,每个设备的内部状态都在不断变化。在许多情况下,这些设备都是在一些简单的网络协议上运行的原始硬件。这种 “极简” 要求与 actor model 非常吻合,因为 actor model 的基本原则之一就是将业务逻辑分解成最小的任务,由各个 actor 来处理。

actor 具有 delivery guarantees 和 isolation 特性,非常适合物联网世界,是模拟数百万个并发连接的传感器生成实时数据的绝佳工具。它们设计轻巧,因此可以在不消耗过多计算资源的情况下进行扩展。

以下是行动者适合物联网的特征属性:

  • Scalability:物联网带来了许多挑战,如何处理所有同时连接的设备产生的大量数据,并对其进行检索、汇总、分析和推送,同时保持设备的响应速度。面临的挑战包括管理高峰期接收传感器数据的巨大突发流量、批处理和实时处理这些海量数据,以及进行模拟真实世界使用模式的大规模仿真。一些物联网部署还要求后端服务管理设备,而不仅仅是吸收设备发送的数据。管理这一切的后端系统需要能够按需扩展,并具有完全的弹性。这非常适合 reactive architectures ,尤其是 Akka。

  • Concurrency:物联网应用网关是系统中将本地传感器和执行器连接到云的点(例如路边站、运输过程中的车载设备或家庭自动化网关)。即使一个应用程序在传感器、执行器和云服务之间 “只转发数据”,也会有并发事件。物联网应用网关需要处理在其环境中发生的事件流和到达其接口的数据流。环境以自己的速度产生数据并要求输出。Actor model 通过消息传递实现了对来自设备的消息的高性能并发处理,从而解决了上述问题。message-processing models的优势之一是,传统的并发问题(主要是共享状态的同步)不再是问题。行为体可以保留设备内部状态或活动会话等私有状态,并在没有锁的情况下自由更新。Actor model 可确保一次只处理一条消息。

  • Fault Tolerance:在构建可能被数百万联网设备使用的服务时,您需要一个应对信息流的模型。您需要对设备故障、信息丢失和服务失败时的情况进行抽象。今天,我们常常认为调用堆栈是理所当然的。但是,它们发明的年代,由于多 CPU 系统并不常见,并发编程并不那么重要。调用栈不能跨线程,因此不能模拟异步调用链。

上图显示了一个严重的问题。工作线程如何处理这种情况?它很可能无法解决问题,因为它通常不知道失败任务的目的。调用者 “线程需要得到通知,但没有调用栈可以释放异常。失败通知只能通过侧通道完成,例如,在 “调用者 “线程希望得到结果的地方放置一个错误代码。如果没有这种通知,“调用者 “就永远不会收到失败通知,任务也就丢失了!这与网络系统的工作原理惊人地相似,在网络系统中,信息/请求可能在没有任何通知的情况下丢失/失败。

有了 actor,我们可以将 actor 组织成监管层次,因此,单个 actor 的错误不会导致整个系统瘫痪。

  • LightWeight:基准测试表明,Akka 模型每千兆字节堆内存可处理 250 万个角色,单机每秒可处理 5000 万条消息。

  • Network Protocol Decoupling:利用 actor model ,我们可以利用容错功能,将代表设备的角色与底层通信协议分离开来。这样,代表设备和设备状态的角色就可以从代表通信协议的 actor 中分离出来,从而使设备 actor 免受网络错误的影响,并提高各个 actor 的功能一致性。

  • Non-blocking communications:物联网应用 “必须 “具有反应性和异步性。大多数物联网应用程序都应能够处理来自设备的许多连接以及从设备中获取的所有信息。异步消息传递广泛应用于机器对机器通信。异步通信具有灵活性:应用程序可以发送一条信息,然后继续处理其他事情。actor 是唯一可寻址的,拥有自己独立的邮箱或消息队列。它们通过消息传递支持非阻塞通信,因此适合构建非阻塞和分布式计算系统。

  • Customization:所有行为体都有一个定义明确的生命周期,并配有精致的钩子,如用于生命周期逻辑控制的 preStart()、postRestart() 和 postStop()。在模拟物联网设备时,可以轻松地将自定义初始化和终止例程锚定到相应的钩子上。

object Device {
  def props(deviceType: String, mqttPubSub: ActorRef) = //...
}

class Device(deviceType: String, mqttPubSub: ActorRef) extends Actor {
  import Device._
  
  private var opState: OpState = InitialState(deviceType)
  override def preStart(): Unit = //Initialize device's op-state...
  override def postStop(): Unit = //Reset/Shutdown device...
  
  def receive = {
    case ReportOpState =>
       //Assemble report data with OpState
       mqttPubSub ! new Publish(Mqtt.topicReport, reportData)
    case UpdateOpState(newState) =>
       //Update opState with newState
       mqttPubSub ! new Publish(Mqtt.topicUpdate, updateResult)
    case PowerOff =>
       //Shutdown device...
  }
}
view raw

上面的片段展示了如何在 Scala/Akka 中构建一个设备角色,使用行业标准 MQTT(消息队列遥测传输)发布-订阅消息协议向订阅者发布其运行状态信息。这里的目的并不是研究如何用 Scala 或 Akka 编程,而是提供一个简单的示例,说明 Akka 角色易于理解的逻辑流程。

Resources
#


💬评论