博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
顺序执行到来的消息 actor
阅读量:7109 次
发布时间:2019-06-28

本文共 2004 字,大约阅读时间需要 6 分钟。

在某项目里,有个 actor 需要做一些持久化的操作,这些操作耗时比较久,理应使用异步的代码来写,但是需求又强调每次只能做一个持久化操作,后来的请求应该等待。一个显然的做法是阻塞式的写,这样就能比较简单的实现顺序花操作。

 

代码写完以后,我觉得在 actor 中 block 不够完美,就想其他的解决方案。实际上,借助 akka actor 的一些函数,可以实现在不阻塞的情况下实现顺序执行请求的功能的。这种办法的核心是使用 become, unbecome 函数:actor 设置两种状态 free 和 busy,当 free 的时,处理消息,当 busy 时,暂时将消息存储起来,处理消息后,给 actor 返回 done 指令,actor 的状态重新返回到 free,准备处理下一个请求。具体的实现又有很多细节可以考虑,比如当 busy 时到来的请求存储到哪里,是 stash 起来还是在 actor 内部维护一个 queue。请求的处理逻辑是写在 actor 内部,还是借鉴 cameo 模式,再创建一个 actor。

 

网上已有一种实现,我看了下,觉得应该没有问题。只不过 actor 内部维护了一个 queue,这可能会造成 actor 死亡后重启数据丢失的情况。更好的办法应该是 cameo 模式创建新的 actor 来处理可能出现异常(危险)的工作,其次是把 actor 的 mailbox 当做那个 queue,不要自己维护,按照 doc 缩写,actor 重启后,mailbox 的消息不会丢失。

 

package actorsimport scala.concurrent.Futureimport scala.concurrent.ExecutionContext.Implicits.globalimport akka.actor.{ Actor, ActorRef }import play.api.libs.concurrent.Akkaimport play.api.Loggerimport play.api.Play.currenttrait SequentialActor {  this: Actor =>  import SequentialActor._  // Actor defines type Receive as PartialFunction[Any, Unit]  type ReceiveAsync = PartialFunction[Any, Future[_]]  private val queue = scala.collection.mutable.Queue[Job]()  private def enqueue(job: Job): Unit = queue enqueue job  private def dequeue: Option[Job] = if (queue.isEmpty) None else Some(queue.dequeue)  private var _senderAsync: ActorRef = _  def senderAsync = _senderAsync  def receive: Receive = {    case msg =>      context become busy      process(Job(msg, sender))  }  def busy: Receive = {    case Done =>      dequeue match {        case None => context.unbecome        case Some(job) => process(job)      }    case msg =>      enqueue(Job(msg, sender))  }  def process(job: Job) {    _senderAsync = job.sender    (receiveAsync orElse fallback)(job.msg).onComplete { _ => self ! Done }  }  def receiveAsync: ReceiveAsync  def fallback: ReceiveAsync = {    case msg =>      Logger.error(s"Unhandled message: $msg")      Future.successful{ () }  }}object SequentialActor {  case object Done  case class Job(msg: Any, sender: ActorRef)}

  

转载地址:http://xplhl.baihongyu.com/

你可能感兴趣的文章
2.1分层数据流
查看>>
laravel创建新的提交数据
查看>>
FineBI学习系列之FineBI的ETL处理(图文详解)
查看>>
Java 8 新特性
查看>>
Windows启动配置数据(BCD)存储文件包含一些无效信息
查看>>
slim请求参数获取
查看>>
MySQL主从介绍 准备工作 配置主 配置从 测试主从同步
查看>>
CSS------当内容超出div宽度后自动换行和限制文字不超出div宽度和高度
查看>>
要恢复页面吗?Chrome未正确关闭
查看>>
hbs模板(zmaze ui用的)
查看>>
使用ASP.NET SignalR实现一个简单的聊天室
查看>>
Spring3.1 对Bean Validation规范的新支持(方法级别验证)
查看>>
基于Docker搭建MySQL主从复制
查看>>
005 使用SpringMVC开发restful API三--处理创建请求
查看>>
手机Soc芯片简介
查看>>
Gradle Java Web应用程序并在Tomcat上运行
查看>>
WPF 关于圆角的制作
查看>>
前端性能优化之WebP
查看>>
android studio 各种问题 应该能帮助到你们
查看>>
.Net 鉴权授权
查看>>