Основи роботи з акторами в akka, записки програміста

Akka - це бібліотека для мов Java і Scala, в якій реалізовані агенти, актори, взаємодія між акторами по мережі і багато інших удобняшкі. У першому наближенні це нагадує Erlang або Cloud Haskell. але насправді Akka куди могутніше. Наприклад, за допомогою Akka можна легко об'єднати декілька машин в кластер, в якому буде відслідковуватися зникнення і поява машин, а актори будуть автоматично переміщатися з машини на машину в міру зміни розмірів кластера. І це тільки один з прикладів. У цій замітці ми напишемо дуже просте додаток, що використовує Akka, щоб показати, наскільки легко працювати з цією бібліотекою.

Далі передбачається, що ви знаєте, що таке модель акторів, а також, що ви вмієте працювати з футуро в Scala.

Відкриємо build.sbt і підключимо Akka до нашого проекту:

libraryDependencies + =
"Com.typesafe.akka" %% "akka-actor"% "2.3.6"

Напишемо найпростіший актор, який зберігає пари ключ-значення:

case class SetRequest # 40; key. String, value. String # 41;
case class GetRequest # 40; key. String # 41;
case class GetResponse # 40; key. Option # 91; String # 93; # 41;

class MapActor extends Actor with ActorLogging # 123;
val state. mutable. Map # 91; String, String # 93; = Mutable. Map. empty

def receive = # 123;
case r. SetRequest =>
state + = r. key -> r. value
case r. GetRequest =>
sender. GetResponse # 40; state. get # 40; r. key # 41; # 41;
case r =>
log. warning # 40; s "Unexpected: $ r" # 41;
# 125;
# 125;

Як бачите, все досить просто. Отримуючи повідомлення типу SetRequest, актор зберігає в мапе пару ключ-значення. Якщо ж актору приходить GetRequest, він знаходить в мапе значення за поданою ключу і посилає відправнику GetResponse зі знайденим значенням типу Option # 91; String # 93; .

Для запуску акторів нам знадобиться створити ActorSystem:

object Example2 extends App # 123;
val system = ActorSystem # 40; "System" # 41;
val mainActor = system. actorOf # 40; Props # 40; new MainActor # 41 ;. "MainActor" # 41;
system. awaitTermination # 40; # 41;
# 125;

Тут ми створюємо один-єдиний актор MainActor і чекаємо завершення роботи ActorSystem. MainActor визначимо наступним чином:

case object Start

class MainActor extends Actor with ActorLogging # 123;
implicit val timeout = Timeout # 40; 5 seconds # 41;
val mapActor = context. actorOf # 40; Props # 40; new MapActor # 41 ;. "MapActor" # 41;

override def preStart # 40; # 41; # 123;
self. Start
# 125;

def receive = # 123;
case Start =>
mapActor. "Dummy request"
mapActor. SetRequest # 40; "Key". "Value" # 41;
val respF = mapActor. GetRequest # 40; "Key" # 41;
respF pipeTo self

case r. GetResponse =>
log. warning # 40; s "Response: $ r" # 41;
context. system. shutdown # 40; # 41;
# 125;
# 125;

Як бачите, тут створюється MapActor, який ми визначили раніше. Також при запуску MainActor посилає сам собі повідомлення Start (див метод preStart). Після отримання цього повідомлення актор шле MapActor'у три повідомлення. Посилка перших двох повідомлень відбувається за принципом fire and forget. А ось при посилці запиту GetRequest актор очікує отримати якусь відповідь, тому замість. (Вимовляється «tell» або «bang») використовується оператор. (Вимовляється «ask»).

Останній повертає Футура з відповіддю сервера respF. Таким чином, ніякого блокування не відбувається, на відміну, наприклад, від gen_server: call в Erlang. Далі за допомогою комбінатора pipeTo ми пересилаємо відповідь, коли він стане доступний, самі собі. Само собою зрозуміло, pipeTo працює не тільки з self, а й з будь-якими іншими акторами. Коли MainActor нарешті отримує відповідь (GetResponse), він просто виводить його в лог і потім зупиняє ActorSystem.

До речі, саме для оператора. потрібен неявний (implicit) аргумент timeout.

Код програми цілком:

package me. eax. akka_examples

import akka. actor. _
import akka. pattern. # 123; ask, pipe # 125;
import akka. util. Timeout

import scala. collection. _
import scala. concurrent. duration. _
import scala. concurrent. ExecutionContext. Implicits. global

case class SetRequest # 40; key. String, value. String # 41;
case class GetRequest # 40; key. String # 41;
case class GetResponse # 40; key. Option # 91; String # 93; # 41;

class MapActor extends Actor with ActorLogging # 123;
val state. mutable. Map # 91; String, String # 93; = Mutable. Map. empty

def receive = # 123;
case r. SetRequest =>
state + = r. key -> r. value
case r. GetRequest =>
sender. GetResponse # 40; state. get # 40; r. key # 41; # 41;
case r =>
log. warning # 40; s "Unexpected: $ r" # 41;
# 125;
# 125;

case object Start

class MainActor extends Actor with ActorLogging # 123;
implicit val timeout = Timeout # 40; 5 seconds # 41;
val mapActor = context. actorOf # 40; Props # 40; new MapActor # 41 ;. "MapActor" # 41;

override def preStart # 40; # 41; # 123;
self. Start
# 125;

def receive = # 123;
case Start =>
mapActor. "Dummy request"
mapActor. SetRequest # 40; "Key". "Value" # 41;
val respF = mapActor. GetRequest # 40; "Key" # 41;
respF pipeTo self

case r. GetResponse =>
log. warning # 40; s "Response: $ r" # 41;
context. system. shutdown # 40; # 41;
# 125;
# 125;

object Example2 extends App # 123;
val system = ActorSystem # 40; "System" # 41;
val mainActor = system. actorOf # 40; Props # 40; new MainActor # 41 ;. "MainActor" # 41;
system. awaitTermination # 40; # 41;
# 125;

Висновок програми (таймстемпи і інформація про диспетчерів опущена):

[Akka: // system / user / mainActor / mapActor] Unexpected: dummy request
[Akka: // system / user / mainActor] Response: GetResponse (Some (value))

А чи не працює випадково з Akka хтось із читачів?

Сподобався пост? Поділися з іншими:

Схожі статті