順序執行到來的訊息 actor

2021-09-07 01:45:16 字數 1215 閱讀 2313

在某專案裡,有個 actor 需要做一些持久化的操作,這些操作耗時比較久,理應使用非同步的**來寫,但是需求又強調每次只能做乙個持久化操作,後來的請求應該等待。乙個顯然的做法是阻塞式的寫,這樣就能比較簡單的實現順序花操作。

**寫完以後,我覺得在 actor 中 block 不夠完美,就想其他的解決方案。實際上,借助 akka actor 的一些函式,可以實現在不阻塞的情況下實現順序執行請求的功能的。這種辦法的核心是使用 become, unbecome 函式:actor 設定兩種狀態 free 和 busy,當 free 的時,處理訊息,當 busy 時,暫時將訊息儲存起來,處理訊息後,給 actor 返回 done 指令,actor 的狀態重新返回到 free,準備處理下乙個請求。具體的實現又有很多細節可以考慮,比如當 busy 時到來的請求儲存到**,是 stash 起來還是在 actor 內部維護乙個 queue。請求的處理邏輯是寫在 actor 內部,還是借鑑 cameo 模式,再建立乙個 actor。

網上已有一種實現,我看了下,覺得應該沒有問題。只不過 actor 內部維護了乙個 queue,這可能會造成 actor 死亡後重啟資料丟失的情況。更好的辦法應該是 cameo 模式建立新的 actor 來處理可能出現異常(危險)的工作,其次是把 actor 的 mailbox 當做那個 queue,不要自己維護,按照 doc 縮寫,actor 重啟後,mailbox 的訊息不會丟失。

package actors

import scala.concurrent.future

import scala.concurrent.executioncontext.implicits.global

import akka.actor.

import play.api.libs.concurrent.akka

import play.api.logger

import play.api.play.current

trait sequentialactor

def busy: receive =

case msg =>

enqueue(job(msg, sender))

} def process(job: job)

} def receiveasync: receiveasync

def fallback: receiveasync =

}}object sequentialactor

scala的Actor持續接收訊息

持續接收訊息 通過上乙個案例,actorreceiver呼叫receive來接收訊息,但接收一次後,actor就退出了。object actorsender extends actor object actorreceiver extends actor object actormsgdemo 上述...

如何保證訊息佇列順序執行?

其實這個也是用 mq 的時候必問的話題,第一看看你了不了解順序這個事兒?第二看看你有沒有辦法保證訊息是有順序的?這是生產系統中常見的問題。我舉個例子,我們以前做過乙個 mysqlbinlog同步的系統,壓力還是非常大的,日同步資料要達到上億,就是說資料從乙個 mysql 庫原封不動地同步到另乙個 m...

Rabbitmq如何保證訊息順序執行

訊息佇列中的若干訊息如果是對同乙個資料進行操作,這些操作具有前後的關係,必須要按前後的順序執行,否則就會造成資料異常。舉例 比如通過mysql binlog進行兩個資料庫的資料同步,由於對資料庫的資料操作是具有順序性的,如果操作順序搞反,就會造成不可估量的錯誤。比如資料庫對一條資料依次進行了 插入 ...