From 456657f5e8247f294daa7774c5c8d35589c7ff0e Mon Sep 17 00:00:00 2001 From: Magnus Madsen Date: Wed, 3 Dec 2025 13:40:55 +0100 Subject: [PATCH 1/2] refactor: support Flix 0.67.1 --- CLAUDE.md | 43 ++ examples/Become.flix | 59 --- examples/Composition.flix | 130 ------ examples/Countdown.flix | 52 --- examples/Counter.flix | 89 ---- examples/HelloWorld.flix | 41 -- examples/LoadBalancer.flix | 122 ------ examples/Nesting.flix | 69 ---- examples/PingPong.flix | 76 ---- examples/RestartAndStop.flix | 104 ----- examples/RestartOnce.flix | 54 --- examples/RestartOncePer5Min.flix | 54 --- examples/RestartParent.flix | 72 ---- examples/StartupAndShutdown.flix | 100 ----- flix.toml | 6 + src/ActorBehavior.flix | 6 + src/ActorDirectory.flix | 33 ++ src/ActorPolicy.flix | 150 +++++++ src/{main/actor => }/ActorResult.flix | 4 +- src/ActorSystem.flix | 560 ++++++++++++++++++++++++++ src/Mailbox.flix | 5 + src/Main.flix | 56 +++ src/{main/actor => }/Message.flix | 10 +- src/Ready.flix | 6 + src/{main/actor => }/State.flix | 10 +- src/{main/actor => }/SystemEvent.flix | 30 +- src/example/Become.flix | 70 ++++ src/example/Composition.flix | 125 ++++++ src/example/Countdown.flix | 52 +++ src/example/Counter.flix | 93 +++++ src/example/HelloWorld.flix | 42 ++ src/example/LoadBalancer.flix | 132 ++++++ src/example/Nesting.flix | 85 ++++ src/example/PingPong.flix | 78 ++++ src/example/RestartAndStop.flix | 108 +++++ src/example/RestartOnce.flix | 60 +++ src/example/RestartOncePer5Min.flix | 64 +++ src/example/RestartParent.flix | 86 ++++ src/example/StartupAndShutdown.flix | 131 ++++++ src/main/actor/ActorBehavior.flix | 4 - src/main/actor/ActorDirectory.flix | 38 -- src/main/actor/ActorPolicy.flix | 191 --------- src/main/actor/ActorSystem.flix | 479 ---------------------- src/main/actor/Mailbox.flix | 4 - src/main/actor/Ready.flix | 6 - test/TestMain.flix | 2 + 46 files changed, 2021 insertions(+), 1770 deletions(-) create mode 100644 CLAUDE.md delete mode 100644 examples/Become.flix delete mode 100644 examples/Composition.flix delete mode 100644 examples/Countdown.flix delete mode 100644 examples/Counter.flix delete mode 100644 examples/HelloWorld.flix delete mode 100644 examples/LoadBalancer.flix delete mode 100644 examples/Nesting.flix delete mode 100644 examples/PingPong.flix delete mode 100644 examples/RestartAndStop.flix delete mode 100644 examples/RestartOnce.flix delete mode 100644 examples/RestartOncePer5Min.flix delete mode 100644 examples/RestartParent.flix delete mode 100644 examples/StartupAndShutdown.flix create mode 100644 flix.toml create mode 100644 src/ActorBehavior.flix create mode 100644 src/ActorDirectory.flix create mode 100644 src/ActorPolicy.flix rename src/{main/actor => }/ActorResult.flix (95%) create mode 100644 src/ActorSystem.flix create mode 100644 src/Mailbox.flix create mode 100644 src/Main.flix rename src/{main/actor => }/Message.flix (60%) create mode 100644 src/Ready.flix rename src/{main/actor => }/State.flix (82%) rename src/{main/actor => }/SystemEvent.flix (68%) create mode 100644 src/example/Become.flix create mode 100644 src/example/Composition.flix create mode 100644 src/example/Countdown.flix create mode 100644 src/example/Counter.flix create mode 100644 src/example/HelloWorld.flix create mode 100644 src/example/LoadBalancer.flix create mode 100644 src/example/Nesting.flix create mode 100644 src/example/PingPong.flix create mode 100644 src/example/RestartAndStop.flix create mode 100644 src/example/RestartOnce.flix create mode 100644 src/example/RestartOncePer5Min.flix create mode 100644 src/example/RestartParent.flix create mode 100644 src/example/StartupAndShutdown.flix delete mode 100644 src/main/actor/ActorBehavior.flix delete mode 100644 src/main/actor/ActorDirectory.flix delete mode 100644 src/main/actor/ActorPolicy.flix delete mode 100644 src/main/actor/ActorSystem.flix delete mode 100644 src/main/actor/Mailbox.flix delete mode 100644 src/main/actor/Ready.flix create mode 100644 test/TestMain.flix diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..84e0f84 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,43 @@ +# BibBlitz + +A DOI-to-BibTeX converter written in Flix that fetches bibliographic metadata from the CrossRef API. + +## Flix Resources + +Use these resources to lookup how to use Flix. + +### Flix Documentation + +The Flix documentation can be found at https://doc.flix.dev/ + +### Flix API + +The Flix API can be found at https://api.flix.dev/ + +### Flix Examples + +Useful Flix examples can be found at https://github.com/flix/flix/tree/master/examples + +### JSON API + +The JSON API can be found at https://github.com/mlutze/flix-json/activity + +## Development Workflow + +### Checking Compilation + +Check if the project compiles by running: + +```bash +java -jar flix.jar check +``` + +IMPORTANT: Always run this command after every refactor to ensure the code compiles correctly. + +### Running the Program + +Run the main program with: + +```bash +java -jar flix.jar run +``` \ No newline at end of file diff --git a/examples/Become.flix b/examples/Become.flix deleted file mode 100644 index 2cd9cfe..0000000 --- a/examples/Become.flix +++ /dev/null @@ -1,59 +0,0 @@ -/// /// -/// /// An actor system that demonstrates how an actor can change its behavior (i.e. "become" another actor). -/// /// -/// namespace Become { - -/// /// -/// /// The main entry point of the actor system. -/// /// -/// @test -/// pub def main(): Unit & Impure = -/// let sys = ActorSystem.start(system()); -/// <- Timer.milliseconds(50i64); -/// ActorDirectory.send(sys, "greetingActor", Msg("HELO", Channel.new())); -/// ActorDirectory.send(sys, "greetingActor", Msg("HELO", Channel.new())); -/// ActorDirectory.send(sys, "greetingActor", Msg("HELO", Channel.new())); -/// ActorDirectory.send(sys, "greetingActor", Msg("HELO", Channel.new())); -/// ActorDirectory.send(sys, "greetingActor", Msg("HELO", Channel.new())); -/// ActorDirectory.send(sys, "greetingActor", Msg("HELO", Channel.new())); -/// <- Timer.milliseconds(50i64); -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns the actor system. -/// /// -/// pub def system(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "greetingActor" -> danishGreetingActor(2) -/// }; -/// { actors = actors | empty } -/// } - -/// /// -/// /// An actor that prints a greeting in Danish. -/// /// -/// pub def danishGreetingActor(n: Int, dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => c <- Ready; danishGreetingActor(n, dir, mailbox) -/// case Stop => Done -/// case Msg("HELO", _) => Console.printLine("Hejsa!"); -/// if (n > 0) -/// danishGreetingActor(n - 1, dir, mailbox) -/// else -/// swedishGreetingActor(dir, mailbox) -/// case Msg(msg, _) => ResumableCrash(danishGreetingActor(0), "Unexpected message: " + msg) -/// } - -/// /// -/// /// An actor that prints a greeting in Swedish. -/// /// -/// pub def swedishGreetingActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => c <- Ready; swedishGreetingActor(dir, mailbox) -/// case Stop => Done -/// case Msg("HELO", _) => Console.printLine("Hej då!"); swedishGreetingActor(dir, mailbox) -/// case Msg(msg, _) => ResumableCrash(swedishGreetingActor, "Unexpected message: " + msg) -/// } - -/// } diff --git a/examples/Composition.flix b/examples/Composition.flix deleted file mode 100644 index d7a4fad..0000000 --- a/examples/Composition.flix +++ /dev/null @@ -1,130 +0,0 @@ -/// /// -/// /// An actor system that demonstrates how we can compose two actor systems into a single one. -/// /// -/// namespace ActorSystemComposition { - -/// /// -/// /// The main entry point of the actor system. -/// /// -/// @test -/// pub def main(): Unit & Impure = { -/// // Composes the two systems together. -/// // TODO: the systems are copy-pasted from Counter.flix and Countdown.flix, they should probably just be imported? -/// let composed = ActorSystem.compose(system1(), system2()); -/// match composed { -/// case Some(s) => -/// let sys = ActorSystem.start(s); -/// <- Timer.milliseconds(2000i64); -/// ActorSystem.shutdown(sys) -/// case None => () -/// } -/// } - -/// /// -/// /// Returns an actor system with one counter actor and multiple worker actors. -/// /// -/// /// The actor system uses empty policies. -/// /// -/// pub def system1(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "countdown" -> countdownActor(11i32) -/// }; -/// { actors = actors | empty } -/// } - -/// /// -/// /// An actor that performs a countdown and then explodes. -/// /// -/// pub def countdownActor(n: Int, dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => -/// Console.printLine("Ready..."); -/// c <- Ready; -/// mailbox <- Msg("TICK", mailbox); -/// countdownActor(n, dir, mailbox) -/// case Stop => Done -/// case Msg("TICK", _) => -/// if (n == 0) { -/// Console.printLine("KABOOM!"); -/// NonResumableCrash("KABOOM!") -/// } else { -/// Console.printLine("Tick-Tock"); -/// mailbox <- Msg("TICK", mailbox); -/// countdownActor(n - 1, dir, mailbox) -/// } -/// case Msg(msg, _) => ResumableCrash(countdownActor(n), "Unexpected message: " + msg) -/// } - -/// /// -/// /// Returns an actor system with one counter actor and multiple worker actors. -/// /// -/// /// The actor system uses empty policies. -/// /// -/// pub def system2(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "counter" -> counterActor(0i32), -/// "worker1" -> workerActor(None), -/// "worker2" -> workerActor(None) -/// }; -/// let dependencies = #{ -/// DependsOn("worker1", "counter"). -/// DependsOn("worker2", "counter"). -/// }; -/// { actors = actors, startPolicy = empty.startPolicy <+> dependencies | empty } -/// } - -/// /// -/// /// An actor that maintains an internal counter. -/// /// -/// /// The counter can be incremented, decrement, and its value retrieved. -/// /// -/// /// The actor non-resumably crashes if the counter overflows. -/// /// The actor resumably crashes if the counter becomes negative. -/// /// -/// /// -/// pub def counterActor(n: Int, dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => c <- Ready; counterActor(n, dir, mailbox) -/// case Stop => Done -/// case Msg("GET", c) => -/// c <- Msg("", mailbox); // TODO: Here we should reply with the internal number. -/// counterActor(n, dir, mailbox) -/// case Msg("INC", _) => -/// if (n < 2147483647) -/// counterActor(n + 1, dir, mailbox) -/// else -/// NonResumableCrash("Counter overflowed.") -/// case Msg("DEC", _) => -/// if (n == 0) -/// ResumableCrash(counterActor(n), "Counter about to become negative.") -/// else -/// counterActor(n - 1, dir, mailbox) -/// case Msg(msg, _) => ResumableCrash(counterActor(n), "Unexpected message: " + msg) -/// } - -/// /// -/// /// An actor that requests the counter actor to increment its number. -/// /// -/// pub def workerActor(counterActor: Option[Mailbox], dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => -/// let counter = ActorDirectory.getActor(dir, "counter"); -/// mailbox <- Msg("WAKEY-WAKEY", mailbox); -/// mailbox <- Msg("WAKEY-WAKEY", mailbox); -/// mailbox <- Msg("WAKEY-WAKEY", mailbox); -/// c <- Ready(); -/// workerActor(counter, dir, mailbox) -/// case Stop => Done -/// case Msg("WAKEY-WAKEY", _) => -/// match counterActor { -/// case None => NonResumableCrash("Where did the counterActor go?") -/// case Some(c) => -/// c <- Msg("INC", mailbox); -/// workerActor(counterActor, dir, mailbox) -/// } -/// case Msg(msg, _) => ResumableCrash(workerActor(counterActor), "Unexpected message: " + msg) -/// } - -/// } diff --git a/examples/Countdown.flix b/examples/Countdown.flix deleted file mode 100644 index e3a37fd..0000000 --- a/examples/Countdown.flix +++ /dev/null @@ -1,52 +0,0 @@ -/// /// -/// /// An actor system with a single actor that performs a countdown and then explodes. -/// /// The actor system then restarts it for it to happen all over again. -/// /// -/// namespace Countdown { - -/// /// -/// /// The main entry point of the actor system. -/// /// -/// @test -/// pub def main(): Unit & Impure = -/// let sys = ActorSystem.start(system()); -/// <- Timer.milliseconds(50i64); -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns an actor system with one counter actor and multiple worker actors. -/// /// -/// /// The actor system uses empty policies. -/// /// -/// pub def system(): ActorSystem & Impure = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "countdown" -> countdownActor(11i32) -/// }; -/// { actors = actors | empty } -/// } - -/// /// -/// /// An actor that performs a countdown and then explodes. -/// /// -/// pub def countdownActor(n: Int, dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => -/// Console.printLine("Ready..."); -/// c <- Ready; -/// mailbox <- Msg("TICK", mailbox); -/// countdownActor(n, dir, mailbox) -/// case Stop => Done -/// case Msg("TICK", _) => -/// if (n == 0) { -/// Console.printLine("KABOOM!"); -/// NonResumableCrash("KABOOM!") -/// } else { -/// Console.printLine("Tick-Tock"); -/// mailbox <- Msg("TICK", mailbox); -/// countdownActor(n - 1, dir, mailbox) -/// } -/// case Msg(msg, _) => ResumableCrash(countdownActor(n), "Unexpected message: " + msg) -/// } - -/// } diff --git a/examples/Counter.flix b/examples/Counter.flix deleted file mode 100644 index 7969154..0000000 --- a/examples/Counter.flix +++ /dev/null @@ -1,89 +0,0 @@ -/// /// -/// /// A simple actor system that consists of: -/// /// -/// /// - 1x counterActor that maintains an internal counter. -/// /// - 2x workerActors that increment the counter. -/// /// -/// namespace Counter { - -/// /// -/// /// The main entry point of the actor system. -/// /// -/// @test -/// pub def main(): Unit = -/// let sys = ActorSystem.start(system()); -/// <- Timer.milliseconds(100i64); -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns an actor system with one counter actor and multiple worker actors. -/// /// -/// /// The actor system uses empty policies. -/// /// -/// pub def system(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "counter" -> counterActor(0i32), -/// "worker1" -> workerActor(None), -/// "worker2" -> workerActor(None) -/// }; -/// let dependencies = #{ -/// DependsOn("worker1", "counter"). -/// DependsOn("worker2", "counter"). -/// }; -/// { actors = actors, startPolicy = empty.startPolicy <+> dependencies | empty } -/// } - -/// /// -/// /// An actor that maintains an internal counter. -/// /// -/// /// The counter can be incremented, decrement, and its value retrieved. -/// /// -/// /// The actor non-resumably crashes if the counter overflows. -/// /// The actor resumably crashes if the counter becomes negative. -/// /// -/// /// -/// pub def counterActor(n: Int, dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => c <- Ready; counterActor(n, dir, mailbox) -/// case Stop => Done -/// case Msg("GET", c) => -/// c <- Msg("", mailbox); // TODO: Here we should reply with the internal number. -/// counterActor(n, dir, mailbox) -/// case Msg("INC", _) => -/// if (n < 2147483647) -/// counterActor(n + 1, dir, mailbox) -/// else -/// NonResumableCrash("Counter overflowed.") -/// case Msg("DEC", _) => -/// if (n == 0) -/// ResumableCrash(counterActor(n), "Counter about to become negative.") -/// else -/// counterActor(n - 1, dir, mailbox) -/// case Msg(msg, _) => ResumableCrash(counterActor(n), "Unexpected message: " + msg) -/// } - -/// /// -/// /// An actor that requests the counter actor to increment its number. -/// /// -/// pub def workerActor(counterActor: Option[Mailbox], dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// let counter = ActorDirectory.getActor(dir, "counter"); -/// // TODO: Quentin: If I uncomment these lines then the system gets stuck. -/// //mailbox <- Msg("WAKEY-WAKEY", mailbox); -/// //mailbox <- Msg("WAKEY-WAKEY", mailbox); -/// //mailbox <- Msg("WAKEY-WAKEY", mailbox); -/// c <- Ready(); -/// workerActor(counter, dir, mailbox) -/// case Stop => Done -/// case Msg("WAKEY-WAKEY", _) => -/// match counterActor { -/// case None => NonResumableCrash("Where did the counterActor go?") -/// case Some(c) => -/// c <- Msg("INC", mailbox); -/// workerActor(counterActor, dir, mailbox) -/// } -/// case Msg(msg, _) => ResumableCrash(workerActor(counterActor), "Unexpected message: " + msg) -/// } -/// } diff --git a/examples/HelloWorld.flix b/examples/HelloWorld.flix deleted file mode 100644 index 4792b91..0000000 --- a/examples/HelloWorld.flix +++ /dev/null @@ -1,41 +0,0 @@ -/// /// -/// /// An actor system with a single actor that prints hello and goodbye. -/// /// -/// namespace HelloWorld { - -/// /// -/// /// The main entry point for the single actor system. -/// /// -/// @test -/// pub def main(): Unit = -/// let sys = ActorSystem.start(system()); -/// <- Timer.milliseconds(100i64); -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns an actor system with one hello world actor. -/// /// -/// pub def system(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "helloWorld" -> helloWorld -/// }; -/// { actors = actors | empty } -/// } - -/// /// -/// /// An actor that prints hello world and then terminates. -/// /// -/// pub def helloWorld(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// Console.printLine("Hello World!"); -/// c <- Ready; -/// helloWorld(dir, mailbox) -/// case Stop => -/// Console.printLine("Goodbye World!"); -/// Done -/// case Msg(msg, _) => ResumableCrash(helloWorld, "Unexpected message: " + msg) -/// } - -/// } diff --git a/examples/LoadBalancer.flix b/examples/LoadBalancer.flix deleted file mode 100644 index 567f0e0..0000000 --- a/examples/LoadBalancer.flix +++ /dev/null @@ -1,122 +0,0 @@ -/// /// -/// /// An actor system with an actor that sends pings to a load balancer. -/// /// -/// namespace LoadBalancer { - -/// /// -/// /// The main entry point for the single actor system. -/// /// -/// @test -/// pub def main(): Unit = -/// let sys = ActorSystem.start(system()); -/// // TODO: this can take a fair amount of time to initialize, maybe we want ActorSystem.start to notify us when the system is fully started? -/// <- Timer.milliseconds(1000i64); // wait for initialization -/// let c = Channel.new(); -/// if (???(sys, "ping", Msg("Start", c))) { // TODO: ActorSystem.send -/// <- c; // wait for finalization -/// () -/// } else { -/// () -/// }; -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns an actor system with a ping actor and a pong actor. -/// /// -/// pub def system(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "ping" -> pingActor(5), -/// "pongMaster" -> loadBalancer("pong1" :: "pong2" :: "pong3" :: Nil), -/// "pong1" -> pongActor, -/// "pong2" -> pongActor, -/// "pong3" -> pongActor -/// }; -/// let dependencies = #{ -/// DependsOn("pongMaster", "ping"). -/// DependsOn("pongMaster", "pong1"). -/// DependsOn("pongMaster", "pong2"). -/// DependsOn("pongMaster", "pong3"). -/// }; -/// { actors = actors, startPolicy = empty.startPolicy <+> dependencies | empty } -/// } - -/// /// -/// /// An actor that sends ping messages. -/// /// -/// pub def pingActor(n: Int, dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// c <- Ready; -/// pingActor(n, dir, mailbox) -/// case Stop => Done -/// case Msg("Start", c) => -/// mailbox <- Msg("Pong", c); -/// pingActor(n, dir, mailbox) -/// case Msg("Pong", c) if n == 0 => -/// Console.printLine("pingActor is done."); -/// c <- Msg("Done", c); // notifies that we're done -/// Done -/// case Msg("Pong", c) => -/// Console.printLine("Received Pong!"); -/// match ActorDirectory.getActor(dir, "pongMaster") { -/// case None => pingActor(n, dir, mailbox) -/// case Some(mb) => -/// mb <- Msg("Ping", c); -/// Console.printLine("Sent Ping!"); -/// pingActor(n - 1, dir, mailbox) -/// } -/// case Msg(msg, _) => ResumableCrash(pingActor(n), "Unexpected message: " + msg) -/// } - -/// /// -/// /// An actor that sends pong messages when it receives ping messages, indefinitely. -/// /// -/// pub def pongActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// c <- Ready; -/// pongActor(dir, mailbox) -/// case Stop => Done -/// case Msg("Ping", c) => -/// Console.printLine("Received Ping!"); -/// match ActorDirectory.getActor(dir, "ping") { -/// case None => pongActor(dir, mailbox) -/// case Some(mb) => -/// mb <- Msg("Pong", c); -/// Console.printLine("Sent Pong!"); -/// pongActor(dir, mailbox) -/// } -/// case Msg(msg, _) => ResumableCrash(pongActor, "Unexpected message: " + msg) -/// } - -/// pub def loadBalancer(actorNames: List[Str], dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// let actors = List.foldLeft((acc, name) -> -/// match ActorDirectory.getActor(dir, name) { -/// case Some(mb) => mb :: acc -/// case None => acc -/// }, Nil, actorNames); -/// c <- Ready; -/// if (List.isEmpty(actors)) { -/// NonResumableCrash("Where are my workers?") -/// } else { -/// loadBalancerInitialized(actors, dir, mailbox) -/// } -/// case Stop => Done -/// case Msg(_, _) => NonResumableCrash("I haven't been initialized properly!") -/// } -/// pub def loadBalancerInitialized(actors: List[Mailbox], dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(_) => NonResumableCrash("Wrong initialization") -/// case Stop => Done -/// case Msg(m, c) => -/// match actors { -/// case Nil => NonResumableCrash("Where are my workers?") -/// case worker :: _ => -/// worker <- Msg(m, c); -/// loadBalancerInitialized(List.rotateLeft(1, actors), dir, mailbox) -/// } -/// } -/// } diff --git a/examples/Nesting.flix b/examples/Nesting.flix deleted file mode 100644 index 8b99279..0000000 --- a/examples/Nesting.flix +++ /dev/null @@ -1,69 +0,0 @@ -/// /// -/// /// An actor system that demonstrates how we can nest two actor systems. This is different from composition, where two nested actor systems can share actor names (one name maps to a different actor in each system), and nesting two systems just means we have two different systems, while composing two systems gives a single one. -/// /// -/// namespace Nesting { - -/// /// -/// /// The main entry point of the actor system and its subsystems -/// /// -/// @test -/// pub def main(): Unit & Impure = { -/// let sys = ActorSystem.start(system(2)); -/// let c = Channel.new(); -/// <- Timer.milliseconds(200i64); // wait for initialization -/// // ping and wait an answer -/// if (???(sys, "nested", Msg("PING", c))) { // TODO: ActorSystem.send -/// <- c; -/// () -/// } else { -/// () -/// }; -/// ActorSystem.shutdown(sys) -/// } - -/// /// -/// /// Returns an actor system with one actor. -/// /// -/// /// The actor system uses empty policies. -/// /// -/// pub def system(n: Int): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "nested" -> nestedActor(n, None) -/// }; -/// { actors = actors | empty } -/// } - -/// /// -/// /// An actor that nests other actor systems -/// /// -/// pub def nestedActor(level: Int, s: Option[RunningActorSystem], dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// if (level == 0) { -/// // We got to the end of the nesting chain, just recurse -/// c <- Ready; -/// nestedActor(level, None, dir, mailbox) -/// } else { -/// // We need to nest more -/// let sys = ActorSystem.start(system(level-1)); -/// c <- Ready; -/// nestedActor(level, Some(sys), dir, mailbox) -/// } -/// case Stop => -/// match s { -/// // Shutdown children systems before shutting down ourselves -/// case Some(sys) => ActorSystem.shutdown(sys) -/// case None => () -/// }; -/// Done -/// case Msg("PING", c) => -/// // When we get a PING message, we either send it to the nested subsystem, or answer if we're the last subsystem -/// match s { -/// case Some(sys) => ???(sys, "nested", Msg("PING", c)); () // TODO: ActorSystem.send -/// case None => c <- Msg("PONG", c); () -/// }; -/// nestedActor(level, s, dir, mailbox) -/// case Msg(msg, _) => ResumableCrash(nestedActor(level, s), "Unexpected message: " + msg) -/// } -/// } diff --git a/examples/PingPong.flix b/examples/PingPong.flix deleted file mode 100644 index eef02d1..0000000 --- a/examples/PingPong.flix +++ /dev/null @@ -1,76 +0,0 @@ -/// /// -/// /// An actor system with two actors that sends pings and pongs. -/// /// -/// namespace PingPong { - -/// /// -/// /// The main entry point for the single actor system. -/// /// -/// @test -/// pub def main(): Unit = -/// let sys = ActorSystem.start(system()); -/// <- Timer.milliseconds(100i64); -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns an actor system with a ping actor and a pong actor. -/// /// -/// pub def system(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "ping" -> pingActor(2), -/// "pong" -> pongActor(2) -/// }; -/// { actors = actors | empty } -/// } - -/// /// -/// /// An actor that sends ping messages. -/// /// -/// pub def pingActor(n: Int, dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// c <- Ready; -/// mailbox <- Msg("Pong", mailbox); -/// pingActor(n, dir, mailbox) -/// case Stop => Done -/// case Msg("Pong", _) if n == 0 => -/// Console.printLine("pingActor is done."); -/// Done -/// case Msg("Pong", _) => -/// Console.printLine("Received Pong!"); -/// match ActorDirectory.getActor(dir, "pong") { -/// case None => pingActor(n, dir, mailbox) -/// case Some(c) => -/// c <- Msg("Ping", mailbox); -/// Console.printLine("Sent Ping!"); -/// pingActor(n - 1, dir, mailbox) -/// } -/// case Msg(msg, _) => ResumableCrash(pingActor(n), "Unexpected message: " + msg) -/// } - -/// /// -/// /// An actor that sends pong messages. -/// /// -/// pub def pongActor(n: Int, dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// c <- Ready; -/// pongActor(n, dir, mailbox) -/// case Stop => Done -/// case Msg("Ping", _) if n == 0 => -/// Console.printLine("pongActor is done."); -/// Done -/// case Msg("Ping", _) => -/// Console.printLine("Received Ping!"); -/// match ActorDirectory.getActor(dir, "ping") { -/// case None => pongActor(n, dir, mailbox) -/// case Some(c) => -/// c <- Msg("Pong", mailbox); -/// Console.printLine("Sent Pong!"); -/// pongActor(n - 1, dir, mailbox) -/// } -/// case Msg(msg, _) => ResumableCrash(pongActor(n), "Unexpected message: " + msg) -/// } - -/// } diff --git a/examples/RestartAndStop.flix b/examples/RestartAndStop.flix deleted file mode 100644 index b051427..0000000 --- a/examples/RestartAndStop.flix +++ /dev/null @@ -1,104 +0,0 @@ -/// /// -/// /// An actor system with a master-worker model. -/// /// -/// /// If a worker actor crashes *once* it is restarted. If it crashes again, everything is stopped. -/// /// -/// namespace RestartAndStop { - -/// /// -/// /// The main entry point of the actor system. -/// /// -/// @test -/// pub def main(): Unit = -/// let sys = ActorSystem.start(system()); -/// <- Timer.milliseconds(500i64); -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns an actor system with one counter actor and multiple worker actors. -/// /// -/// /// The actor system uses empty policies. -/// /// -/// pub def system(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "master" -> masterActor(Nil), -/// "worker1" -> workerActor, -/// "worker2" -> workerActor, -/// "worker3" -> workerActor -/// }; -/// let restartPolicy = #{ -/// // Compute the actors that have non-resumably crashed more than once. -/// Blocked(x) :- -/// Log(x, Running, NonResumablyCrashed, timestamp1), -/// Log(x, Running, NonResumablyCrashed, timestamp2), -/// if timestamp1 != timestamp2. - -/// Start(x) :- ActorState(x, NonResumablyCrashed), not Blocked(x). - -/// // TODO: Magnus It is becoming painful to have to reuse "Blocked". -/// // TODO: Would be better to have a name "Workers". -/// Stop(x) :- Actor(x), Blocked("worker1"). -/// Stop(x) :- Actor(x), Blocked("worker2"). -/// Stop(x) :- Actor(x), Blocked("worker3"). - -/// }; -/// { actors = actors, restartPolicy = restartPolicy | empty } -/// } - -/// /// -/// /// A master actor that communicates with its workers. -/// /// -/// pub def masterActor(workers: List[Mailbox], dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// Console.printLine("[Master] Ready"); -/// mailbox <- Msg("GO", mailbox); -/// let workers = Option.getWithDefault({ -/// let flatMap = Option.flatMap; -/// let* worker1 = ActorDirectory.getActor(dir, "worker1"); -/// let* worker2 = ActorDirectory.getActor(dir, "worker2"); -/// let* worker3 = ActorDirectory.getActor(dir, "worker3"); -/// Some(worker1 :: worker2 :: worker3 :: Nil) -/// }, Nil); -/// c <- Ready; -/// masterActor(workers, dir, mailbox) -/// case Stop => -/// Console.printLine("[Master] Stopped."); -/// Done -/// case Msg("GO", _) => -/// workers |> List.mapWithIndex((worker, index) -> { -/// if (index == 0) { -/// worker <- Msg("KABOOM", mailbox) -/// } else { -/// worker <- Msg("WORK", mailbox) -/// } -/// }); -/// <- Timer.milliseconds(50i64); -/// mailbox <- Msg("GO", mailbox); -/// masterActor(workers, dir, mailbox) -/// case Msg(msg, _) => ResumableCrash(masterActor(workers), "Unexpected message: " + msg) -/// } - -/// /// -/// /// An worker actor that can work or explode. -/// /// -/// pub def workerActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// Console.printLine("[Worker] Ready."); -/// c <- Ready; -/// workerActor(dir, mailbox) -/// case Stop => -/// Console.printLine("[Worker] Stopped."); -/// Done -/// case Msg("WORK", _) => -/// Console.printLine("[Worker] Working."); -/// workerActor(dir, mailbox) -/// case Msg("KABOOM", _) => -/// Console.printLine("[Worker] Kaboom!"); -/// NonResumableCrash("WHAM!") -/// case Msg(msg, _) => ResumableCrash(workerActor, "Unexpected message: " + msg) -/// } - -/// } diff --git a/examples/RestartOnce.flix b/examples/RestartOnce.flix deleted file mode 100644 index af98341..0000000 --- a/examples/RestartOnce.flix +++ /dev/null @@ -1,54 +0,0 @@ -/// /// -/// /// An actor system with an actor that crashes and is allowed to restart once. -/// /// -/// namespace RestartOnce { - -/// /// -/// /// The main entry point of the actor system. -/// /// -/// @test -/// pub def main(): Unit = -/// let sys = ActorSystem.start(system()); -/// <- Timer.milliseconds(100i64); -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns an actor system with one counter actor and multiple worker actors. -/// /// -/// /// The actor system uses empty policies. -/// /// -/// pub def system(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "unstable" -> unstableActor -/// }; -/// let restartPolicy = #{ -/// // Compute the actors that have non-resumably crashed more than once. -/// Blocked(x) :- -/// Log(x, Running, NonResumablyCrashed, time1), -/// Log(x, Running, NonResumablyCrashed, time2), -/// if time1 != time2. - -/// Start(x) :- ActorState(x, NonResumablyCrashed), not Blocked(x). -/// }; -/// { actors = actors, restartPolicy = restartPolicy | empty } -/// } - -/// /// -/// /// An actor that explodes. -/// /// -/// pub def unstableActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// Console.printLine("Ready..."); -/// c <- Ready; -/// mailbox <- Msg("KABOOM", mailbox); -/// unstableActor(dir, mailbox) -/// case Stop => Done -/// case Msg("KABOOM", _) => -/// Console.printLine("Kaboom!"); -/// NonResumableCrash("WHAM!") -/// case Msg(msg, _) => ResumableCrash(unstableActor, "Unexpected message: " + msg) -/// } - -/// } diff --git a/examples/RestartOncePer5Min.flix b/examples/RestartOncePer5Min.flix deleted file mode 100644 index 203fea8..0000000 --- a/examples/RestartOncePer5Min.flix +++ /dev/null @@ -1,54 +0,0 @@ -/// /// -/// /// An actor system with an actor that crashes and is allowed to restart once. -/// /// -/// namespace RestartOncePerFive { - -/// /// -/// /// The main entry point of the actor system. -/// /// -/// @test -/// pub def main(): Unit = -/// let sys = ActorSystem.start(system()); -/// <- Timer.milliseconds(100i64); -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns an actor system with one counter actor and multiple worker actors. -/// /// -/// /// The actor system uses empty policies. -/// /// -/// pub def system(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "unstable" -> unstableActor -/// }; -/// let restartPolicy = #{ -/// // Compute the actors that have non-resumably crashed in the last second. -/// Blocked(x) :- -/// Log(x, Running, NonResumablyCrashed, timestamp), -/// Clock(now), -/// if timestamp `Instant.after` (Instant.minus(now, Duration.oneSecond())). - -/// Start(x) :- ActorState(x, NonResumablyCrashed), not Blocked(x). -/// }; -/// { actors = actors, restartPolicy = restartPolicy | empty } -/// } - -/// /// -/// /// An actor that explodes. -/// /// -/// pub def unstableActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => -/// Console.printLine("Ready..."); -/// c <- Ready; -/// mailbox <- Msg("KABOOM", mailbox); -/// unstableActor(dir, mailbox) -/// case Stop => Done -/// case Msg("KABOOM", _) => -/// Console.printLine("Kaboom!"); -/// NonResumableCrash("WHAM!") -/// case Msg(msg, _) => ResumableCrash(unstableActor, "Unexpected message: " + msg) -/// } - -/// } diff --git a/examples/RestartParent.flix b/examples/RestartParent.flix deleted file mode 100644 index 68144a8..0000000 --- a/examples/RestartParent.flix +++ /dev/null @@ -1,72 +0,0 @@ -/// /// -/// /// An actor system with a master-worker model where a crash in a worker restarts the worker and master. -/// /// -/// namespace RestartParent { - -/// /// -/// /// The main entry point of the actor system. -/// /// -/// @test -/// pub def main(): Unit = -/// let sys = ActorSystem.start(system()); -/// <- Timer.milliseconds(50i64); -/// ActorDirectory.send(sys, "worker2", Msg("OHNO", Channel.new())); -/// <- Timer.milliseconds(50i64); -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns the actor system. -/// /// -/// pub def system(): ActorSystem = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "master" -> masterActor, -/// "worker1" -> workerActor, -/// "worker2" -> workerActor, -/// "worker3" -> workerActor -/// }; -/// let dependencies = #{ -/// DependsOn("worker1", "master"). -/// DependsOn("worker2", "master"). -/// DependsOn("worker3", "master"). -/// }; -/// let restartPolicy = #{ -/// // TODO: Magnus: It would be useful to have a predicate to group the workers. - -/// // Start a non-resumably crashed actor. -/// Start(x) :- ActorState(x, NonResumablyCrashed). - -/// // Restart the master, if one of its workers crashes. -/// Restart("master") :- ActorState("worker1", NonResumablyCrashed). -/// Restart("master") :- ActorState("worker2", NonResumablyCrashed). -/// Restart("master") :- ActorState("worker3", NonResumablyCrashed). -/// }; -/// { actors = actors, -/// startPolicy = emptyStartPolicy() <+> dependencies, -/// shutdownPolicy = emptyShutdownPolicy() <+> dependencies, -/// restartPolicy = restartPolicy -/// | empty } -/// } - -/// /// -/// /// A master actor. -/// /// -/// pub def masterActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => Console.printLine("[Master] Ready."); c <- Ready; masterActor(dir, mailbox) -/// case Stop => Console.printLine("[Master] Stopped."); Done -/// case Msg(msg, _) => ResumableCrash(masterActor, "Unexpected message: " + msg) -/// } - -/// /// -/// /// A worker actor. -/// /// -/// pub def workerActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] = -/// match <- mailbox { -/// case Start(c) => Console.printLine("[Worker] Ready."); c <- Ready; workerActor(dir, mailbox) -/// case Stop => Console.printLine("[Worker] Stopped."); Done -/// case Msg("OHNO", _) => Console.printLine("[Worker] Crashed."); NonResumableCrash("WHAM!") -/// case Msg(msg, _) => ResumableCrash(workerActor, "Unexpected message: " + msg) -/// } - -/// } diff --git a/examples/StartupAndShutdown.flix b/examples/StartupAndShutdown.flix deleted file mode 100644 index 5e3f07b..0000000 --- a/examples/StartupAndShutdown.flix +++ /dev/null @@ -1,100 +0,0 @@ -/// /// -/// /// An example that demonstrates how actors can be started and stopped in an orderly fashion. -/// /// -/// namespace StartupAndShutdown { - -/// /// -/// /// The main entry point of the actor system. -/// /// -/// @test -/// pub def main(): Unit & Impure = -/// let sys = ActorSystem.start(system()); -/// <- Timer.milliseconds(100i64); -/// ActorSystem.shutdown(sys) - -/// /// -/// /// Returns the actor system. -/// /// -/// pub def system(): ActorSystem & Impure = { -/// let empty = ActorSystem.empty(); -/// let actors = Map#{ -/// "authenticationActor" -> authenticationActor, -/// "databaseActor" -> databaseActor, -/// "loadBalanceActor" -> loadBalanceActor, -/// "webworker1" -> webWorkerActor, -/// "webworker2" -> webWorkerActor, -/// "webworker3" -> webWorkerActor, -/// "loggingActor" -> loggingActor -/// }; -/// // The actors are started and stopped according to their dependencies. -/// let dependencies = #{ -/// DependsOn("databaseActor", "loggingActor"). - -/// DependsOn("authenticationActor", "databaseActor"). -/// DependsOn("authenticationActor", "loggingActor"). - -/// DependsOn("loadBalanceActor", "authenticationActor"). -/// DependsOn("loadBalanceActor", "databaseActor"). -/// DependsOn("loadBalanceActor", "loggingActor"). - -/// DependsOn("webworker1", "loadBalanceActor"). -/// DependsOn("webworker2", "loadBalanceActor"). -/// DependsOn("webworker3", "loadBalanceActor"). -/// }; -/// { actors = actors, -/// startPolicy = emptyStartPolicy() <+> dependencies, -/// shutdownPolicy = emptyShutdownPolicy() <+> dependencies -/// | empty } -/// } - -/// /// -/// /// An authentication actor. -/// /// -/// pub def authenticationActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => Console.printLine("[Authentication] Ready."); c <- Ready; authenticationActor(dir, mailbox) -/// case Stop => Console.printLine("[Authentication] Stopped."); Done -/// case Msg(msg, _) => ResumableCrash(authenticationActor, "Unexpected message: " + msg) -/// } - -/// /// -/// /// A database actor. -/// /// -/// pub def databaseActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => Console.printLine("[Database] Ready."); c <- Ready; databaseActor(dir, mailbox) -/// case Stop => Console.printLine("[Database] Stopped."); Done -/// case Msg(msg, _) => ResumableCrash(databaseActor, "Unexpected message: " + msg) -/// } - -/// /// -/// /// A load balance actor. -/// /// -/// pub def loadBalanceActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => Console.printLine("[LoadBalance] Ready."); c <- Ready; loadBalanceActor(dir, mailbox) -/// case Stop => Console.printLine("[LoadBalance] Stopped."); Done -/// case Msg(msg, _) => ResumableCrash(loadBalanceActor, "Unexpected message: " + msg) -/// } - -/// /// -/// /// A web worker actor. -/// /// -/// pub def webWorkerActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => Console.printLine("[WebWorker] Ready."); c <- Ready; webWorkerActor(dir, mailbox) -/// case Stop => Console.printLine("[WebWorker] Stopped."); Done -/// case Msg(msg, _) => ResumableCrash(webWorkerActor, "Unexpected message: " + msg) -/// } - -/// /// -/// /// A logging actor. -/// /// -/// pub def loggingActor(dir: ActorDirectory, mailbox: Mailbox): ActorResult[Str] & Impure = -/// match <- mailbox { -/// case Start(c) => Console.printLine("[Logging] Ready."); c <- Ready; loggingActor(dir, mailbox) -/// case Stop => Console.printLine("[Logging] Stopped."); Done -/// case Msg(msg, _) => ResumableCrash(loggingActor, "Unexpected message: " + msg) -/// } - -/// } diff --git a/flix.toml b/flix.toml new file mode 100644 index 0000000..524e086 --- /dev/null +++ b/flix.toml @@ -0,0 +1,6 @@ +[package] +name = "actors" +description = "test" +version = "0.1.0" +flix = "0.67.1" +authors = ["John Doe "] diff --git a/src/ActorBehavior.flix b/src/ActorBehavior.flix new file mode 100644 index 0000000..504ef3d --- /dev/null +++ b/src/ActorBehavior.flix @@ -0,0 +1,6 @@ +/// +/// An actor behavior is a function from an actor directory, a receiver (for incoming messages), +/// and a sender (its own mailbox for others to reply to) to an actor result. +/// The effect is polymorphic so the behavior can run in any region. +/// +pub type alias ActorBehavior = ActorDirectory -> Receiver[Message] -> Mailbox -> ActorResult[String] \ {Chan, NonDet, IO} diff --git a/src/ActorDirectory.flix b/src/ActorDirectory.flix new file mode 100644 index 0000000..6482a47 --- /dev/null +++ b/src/ActorDirectory.flix @@ -0,0 +1,33 @@ +mod ActorDirectory { + + /// + /// Returns an actor mailbox (blocking function). + /// + pub def getActor(s: ActorDirectory, name: String): Option[Mailbox] \ {Chan, NonDet} = { + let RunningActorSystem.RunningActorSystem(sys) = s; + let (replySender, replyReceiver) = Channel.unbuffered(); + Channel.send(SystemEvent.GetActor(name, replySender), sys); + Channel.recv(replyReceiver) + } + + /// + /// Sends a message to an actor, given its name. + /// + pub def send(s: RunningActorSystem, actorName: String, msg: Message): Bool \ {Chan, NonDet} = { + let RunningActorSystem.RunningActorSystem(sys) = s; + let (replySender, replyReceiver) = Channel.unbuffered(); + Channel.send(SystemEvent.SendMessage(actorName, msg, replySender), sys); + Channel.recv(replyReceiver) + } + + /// + /// Dynamically creates a new actor, given its parent name, its behavior, and category. + /// + pub def create(s: RunningActorSystem, parent: String, behavior: ActorBehavior, category: String): Mailbox \ {Chan, NonDet} = { + let RunningActorSystem.RunningActorSystem(sys) = s; + let (replySender, replyReceiver) = Channel.unbuffered(); + Channel.send(SystemEvent.CreateActor(parent, behavior, category, replySender), sys); + Channel.recv(replyReceiver) + } + +} diff --git a/src/ActorPolicy.flix b/src/ActorPolicy.flix new file mode 100644 index 0000000..0c90b1d --- /dev/null +++ b/src/ActorPolicy.flix @@ -0,0 +1,150 @@ +/// +/// A type alias for the row type of an actor policy. +/// +/// Relations: +/// - Actor(name: String) - represents actors in the system +/// - ActorState(name: String, state: State) - current state of an actor +/// - DependsOn(name1: String, name2: String) - dependencies between actors +/// - Start(name: String) - actors that should be started +/// - Stop(name: String) - actors that should be stopped +/// - Restart(name: String) - actors that should be restarted with fresh state +/// - Resume(name: String) - actors that should be resumed from last good state +/// - Log(name: String, last: State, next: State, timestamp: Int64) - transition history +/// - Clock(timestamp: Int64) - current time +/// - Blocked(name: String) - auxiliary relation for dependency checking +/// - Sibling(name1: String, name2: String) - actors with same parent +/// - Created(parent: String, child: String) - parent-child relationship +/// - Category(name: String, category: String) - categorization of actors +/// +pub type alias ActorPolicy = #{ + Actor(String), + DependsOn(String, String), + Start(String), + Stop(String), + Restart(String), + Resume(String), + Log(String, Int32, Int32, Int64), + Clock(Int64), + Blocked(String), + Sibling(String, String), + Created(String, String), + Category(String, String), + Running(String), + InTransition(String), + Stopped(String), + ResumablyCrashed(String), + NonResumablyCrashed(String) +} + +/// +/// If an actor policy specifies multiple actions for the same actor then the final action is computed by join: +/// +/// Start, Start => Start +/// Start, Stop => Start +/// Stop, Start => Start +/// Resume, Stop => Resume +/// Stop, Resume => Resume +/// Start, Resume => Resume +/// Resume, Start => Resume +/// x, x => x +/// + +/// +/// An action is combined with an actor state according to the following table: +/// +/// <: +/// - Start => start the actor. +/// - Stop => no-op. +/// - Resume => no-op. +/// InTransition (actor is changing its state): +/// - Start => no-op. +/// - Stop => no-op. +/// - Resume => no-op. +/// Running: +/// - Start => no-op. +/// - Stop => stop the actor. +/// - Resume => no-op. +/// Stopped: +/// - Start => start the actor. +/// - Stop => no-op. +/// - Resume => start the actor (with the initial state). +/// ResumablyCrashed: +/// - Start => restart the actor with the initial state. (required to allow a reset of the actor) +/// - Stop => change state to stopped (clears the error) (required to allow the error to be cleared). +/// - Resume => resume. +/// NonResumablyCrashed: +/// - Start => start the actor with initial state. +/// - Stop => change state to stopped (clears the error). +/// - Resume => start the actor with initial state. +/// + +mod ActorPolicy { + + /// + /// Returns the empty start policy. + /// + pub def emptyStartPolicy(): ActorPolicy = #{ + // Compute the transitive closure of the DependsOn relation. + DependsOn(x, z) :- DependsOn(x, y), DependsOn(y, z). + + // Compute the actors that are waiting for one or more of its dependencies to be running. + // Note1: An actor is *not* blocked if it depends on itself. This allows actors in cyclic dependencies to be started. + // Note2: An actor that has no dependencies is trivially *never* blocked. + Blocked(x) :- DependsOn(x, y), not Running(y), not DependsOn(x, x). + + // Start actors that are not blocked. + Start(x) :- Actor(x), not Blocked(x). + } + + /// + /// Returns a policy that immediately starts all actors regardless of their dependencies. + /// + pub def immediatelyStartAllPolicy(): ActorPolicy = #{ + Start(actor) :- Actor(actor). + } + + /// + /// Returns the empty shutdown policy. + /// + pub def emptyShutdownPolicy(): ActorPolicy = #{ + // Compute the transitive closure of the DependsOn relation. + DependsOn(x, z) :- DependsOn(x, y), DependsOn(y, z). + + // Compute the actors that are waiting for one or more of its dependencies to be stopped. + // Note1: An actor is *not* blocked if it depends on itself. This allows actors in cyclic dependencies to be stopped. + // Note2: An actor that no-one depends on is trivially *never* blocked. + Blocked(x) :- DependsOn(y, x), Running(y), not DependsOn(x, x). + + // Stop actors that are not blocked. + Stop(x) :- Actor(x), Running(x), not Blocked(x). + } + + /// + /// Returns a policy that immediately shuts down all actors regardless of their dependencies. + /// + pub def immediatelyShutdownAllPolicy(): ActorPolicy = #{ + Stop(actor) :- Running(actor). + } + + /// + /// Returns the empty restart policy. + /// + pub def emptyRestartPolicy(): ActorPolicy = oneForOneRestartPolicy() + + /// + /// Returns a restart policy that restarts an actor when it crashes. + /// + pub def oneForOneRestartPolicy(): ActorPolicy = #{ + Resume(x) :- ResumablyCrashed(x). + Start(x) :- NonResumablyCrashed(x). + } + + /// + /// Returns a restart policy that restarts an actor along with its siblings when it crashes. + /// + pub def oneForAllRestartPolicy(): ActorPolicy = oneForOneRestartPolicy() <+> #{ + Start(x) :- Sibling(x, y), ResumablyCrashed(y). + Start(x) :- Sibling(x, y), NonResumablyCrashed(y). + } + +} diff --git a/src/main/actor/ActorResult.flix b/src/ActorResult.flix similarity index 95% rename from src/main/actor/ActorResult.flix rename to src/ActorResult.flix index f00a910..1d122fe 100644 --- a/src/main/actor/ActorResult.flix +++ b/src/ActorResult.flix @@ -18,7 +18,7 @@ pub enum ActorResult[e] { /// /// Indicates that the actor terminated normally. /// - case Done, + case Done /// /// Indicates that the actor crashed with error value `e`. @@ -26,7 +26,7 @@ pub enum ActorResult[e] { /// @param k the continuation that can be used to resume the actor. /// @param e the error value. /// - case ResumableCrash(ActorBehavior, e), + case ResumableCrash(ActorBehavior, e) /// /// Indicates that the actor crashed with error value `e` and cannot be resumed. diff --git a/src/ActorSystem.flix b/src/ActorSystem.flix new file mode 100644 index 0000000..d052829 --- /dev/null +++ b/src/ActorSystem.flix @@ -0,0 +1,560 @@ +/// +/// An actor system is a type alias for a record that consists of four fields: +/// +/// @field `actors` that declares the actors in the system. +/// @field `startPolicy` that describes the policy for how actors are started. +/// @field `restartPolicy` that describes the policy for how actors are restarted if they crash. +/// @field `shutdownPolicy` that describes the policy for how actors are stopped. +/// +pub type alias ActorSystem = { + actors = Map[String, ActorBehavior], + startPolicy = ActorPolicy, + restartPolicy = ActorPolicy, + shutdownPolicy = ActorPolicy +} + +/// +/// A running actor system is an opaque type wrapping a sender for system events. +/// +pub enum RunningActorSystem(Sender[SystemEvent]) + +/// +/// An actor is identified by its mailbox (sender). +/// +pub type alias ActorRef = Mailbox + +/// +/// An actor directory, used to retrieve mailboxes of actors in the system. +/// +pub type alias ActorDirectory = RunningActorSystem + +/// +/// Internal enum representing the state of an actor. +/// +enum InternalActorState { + /// + /// An actor that has not been started previously, and has no mailbox. + /// + case NotStarted + + /// + /// An actor that has been started and been assigned a mailbox. The actor could be stopped or crashed now. + /// + case Started(State, ActorRef) +} + +/// Internal record type for tracking running actors. +/// map: The actors running +/// log: The log of all events that happened +/// restarting: The actors that are currently restarting +/// dynamicActors: Actors dynamically created with parent and category +type alias ActorMap = { + map = Map[String, (InternalActorState, ActorBehavior, Option[ActorBehavior])], + log = List[(String, State, State, Int64)], + restarting = List[String], + dynamicActors = List[(String, String, String)] +} + +/// +/// Internal enum for actor actions. +/// +enum ActorAction { + case ActionResume + case ActionStart + case ActionRestart + case ActionStop +} + +/// +/// Internal enum representing the policy state. +/// +enum Policy { + case PolicyNotStarted + case PolicyRunning + case PolicyStopping(Sender[Unit]) +} + +mod ActorSystem { + + use ActorPolicy.{emptyStartPolicy, emptyShutdownPolicy, emptyRestartPolicy} + + /// + /// Returns an actor system with an empty configuration. + /// + pub def empty(): ActorSystem = { + actors = Map.empty(), + startPolicy = emptyStartPolicy(), + shutdownPolicy = emptyShutdownPolicy(), + restartPolicy = emptyRestartPolicy() + } + + /// + /// Returns the default mailbox capacity. + /// + pub def emptyMailboxCapacity(): Int32 = 1000 * 1000 + + /// + /// Helper to convert State to an Int32 for logging. + /// + def stateToInt(st: State): Int32 = match st { + case State.InTransition => 0 + case State.Running => 1 + case State.Stopped => 2 + case State.ResumablyCrashed => 3 + case State.NonResumablyCrashed => 4 + } + + /// + /// Returns the current state of the actors in the system, using state relations and the Actor relation. + /// + def statesOfActors(s: ActorSystem, actors: ActorMap): ActorPolicy = { + // Builds the state relations (Running, InTransition, Stopped, ResumablyCrashed, NonResumablyCrashed) + let statesRel = Map.foldLeftWithKey((states, actorName, actor) -> { + let (internalState, _, _) = actor; + match internalState { + case InternalActorState.NotStarted => states <+> #{Stopped(actorName). } + case InternalActorState.Started(State.InTransition, _) => states <+> #{InTransition(actorName). } + case InternalActorState.Started(State.Running, _) => states <+> #{Running(actorName). } + case InternalActorState.Started(State.Stopped, _) => states <+> #{Stopped(actorName). } + case InternalActorState.Started(State.ResumablyCrashed, _) => states <+> #{ResumablyCrashed(actorName). } + case InternalActorState.Started(State.NonResumablyCrashed, _) => states <+> #{NonResumablyCrashed(actorName). } + } + }, #{}, actors#map); + + // Builds the Log relation (using Int32 for state codes) + let logRel = List.foldLeft((log, data) -> { + let (actorName, stateBefore, stateAfter, timestamp) = data; + log <+> #{Log(actorName, stateToInt(stateBefore), stateToInt(stateAfter), timestamp). } + }, #{}, actors#log); + + // Builds the Actor relation + let actorsRel = Set.foldLeft((rel, actor: String) -> { + rel <+> #{Actor(actor). } + }, #{}, Map.keysOf(s#actors)); + + // Builds the Created and Category relations, and fills the Actor relation with dynamic actors + let createdRel = List.foldLeft((rel, dyn) -> { + let (name, parent, category) = dyn; + rel <+> #{Actor(name). Created(parent, name). Category(name, category). } + }, #{}, actors#dynamicActors); + + // Add the rule for the Sibling relation + let sibling = #{Sibling(x, y) :- Actor(parent), Created(parent, x), Created(parent, y). }; + + let rightNow = 0i64; + statesRel <+> logRel <+> actorsRel <+> createdRel <+> sibling <+> #{Clock(rightNow). } + } + + /// + /// Checks that all the actors are in the Stopped state. + /// + def allActorsStopped(state: ActorPolicy): Bool = { + let inTransition = query state select (name) from InTransition(name); + let running = query state select (name) from Running(name); + Vector.isEmpty(inTransition) and Vector.isEmpty(running) + } + + /// + /// Starts an actor given its name, behavior, and mailbox sender. Returns the updated map of running actors. + /// The rc parameter is the shared region where all actor processes are spawned. + /// + def startActor(rc: Region[r], actors: ActorMap, controlSender: Sender[SystemEvent], actorName: String, behavior: ActorBehavior, mailboxSender: Sender[Message], mailboxReceiver: Receiver[Message]): ActorMap \ {Chan, NonDet, IO} = { + // Create the process of the new actor + spawn ( + match behavior(RunningActorSystem.RunningActorSystem(controlSender), mailboxReceiver, mailboxSender) { + case ActorResult.Done => + Channel.send(SystemEvent.ActorDone(actorName), controlSender) + case ActorResult.ResumableCrash(resumableBehavior, _) => + Channel.send(SystemEvent.ActorResumableCrash(actorName, resumableBehavior), controlSender) + case ActorResult.NonResumableCrash(_) => + Channel.send(SystemEvent.ActorNonResumableCrash(actorName), controlSender) + } + ) @ rc; + + // The channel on which the actor will notify us that it has successfully initialized + let (readySender, readyReceiver) = Channel.unbuffered(); + + // Tell the actor to initialize + Channel.send(Message.Start(readySender), mailboxSender); + + // Spawned process to wait for actor to be ready + spawn ({ + discard Channel.recv(readyReceiver); + Channel.send(SystemEvent.ActorReady(actorName), controlSender) + }) @ rc; + + // Add the actor to the map of running actors. It is now in transition. + let newLog = match Map.get(actorName, actors#map) { + case Some((InternalActorState.Started(st, _), _, _)) => (actorName, st, State.InTransition, 0i64) + case _ => (actorName, State.Stopped, State.InTransition, 0i64) + }; + { + map = Map.insert(actorName, (InternalActorState.Started(State.InTransition, mailboxSender), behavior, None), actors#map), + log = newLog :: actors#log + | actors + } + } + + /// + /// Runs the shutdown policy of an actor system. + /// Returns the updated actor map after one iteration of the policy, or None if the system should stop. + /// + def runShutdownPolicy(_rc: Region[r], s: ActorSystem, actors: ActorMap, _controlSender: Sender[SystemEvent], notifySender: Sender[Unit]): Option[ActorMap] \ {Chan, IO} = { + println("[system] shutdown policy"); + let currentState = statesOfActors(s, actors); + + let stopQuery = query currentState, s#restartPolicy, s#shutdownPolicy select (actorName) from Stop(actorName); + let (actors2, n) = Vector.foldLeft((acc, actorName) -> { + let (m, count) = acc; + match Map.get(actorName, m#map) { + case Some((InternalActorState.Started(State.Running, mailbox), _, _)) => + println("!!!!! [system] sending stop to ${actorName}"); + // Running, Stop => stop the actor + Channel.send(Message.Stop, mailbox); + (m, count + 1) + + case Some((InternalActorState.Started(State.ResumablyCrashed, mailbox), behavior, resumableBehavior)) => + // ResumablyCrashed, Stop => change state to stopped + println("[system] setting crashed1 to stop: ${actorName}"); + ({ + map = Map.insert(actorName, (InternalActorState.Started(State.Stopped, mailbox), behavior, resumableBehavior), m#map), + log = (actorName, State.ResumablyCrashed, State.Stopped, 0i64) :: m#log + | m + }, count) + + case Some((InternalActorState.Started(State.NonResumablyCrashed, mailbox), behavior, resumableBehavior)) => + // NonResumablyCrashed, Stop => change state to stopped + println("[system] setting crashed2 to stop: ${actorName}"); + ({ + map = Map.insert(actorName, (InternalActorState.Started(State.Stopped, mailbox), behavior, resumableBehavior), m#map), + log = (actorName, State.NonResumablyCrashed, State.Stopped, 0i64) :: m#log + | m + }, count) + + case _ => + println("[system] non-case for : ${actorName}"); + // Other cases are no-ops + (m, count) + } + }, (actors, 0), stopQuery); + + if (n == 0 and allActorsStopped(currentState)) { + println("[system] fully stopped"); + // System fully stopped + Channel.send((), notifySender); // Notify the requester of the shutdown + None + } else { + println("[system] still some actors to terminate"); + // Some actors still have to be stopped + Some(actors2) + } + } + + /// + /// Runs the restart policy of an actor system. + /// + def runRestartPolicy(rc: Region[r], s: ActorSystem, actors: ActorMap, controlSender: Sender[SystemEvent]): ActorMap \ {Chan, NonDet, IO} = { + let currentState = statesOfActors(s, actors); + let policy = currentState <+> s#startPolicy <+> s#restartPolicy; + + // We just accumulate for each actor, whether it should 1. Stop, 2. Start, 3. Resume, overriding any previous value. + let m0: Map[String, ActorAction] = Map.empty(); + + // The order of the following folds are important: the later folds will take precedence over the previous ones. + let stopQuery = query policy select (actorName) from Stop(actorName); + let m1 = Vector.foldLeft((m, actorName) -> Map.insert(actorName, ActorAction.ActionStop, m), m0, stopQuery); + + let startQuery = query policy select (actorName) from Start(actorName); + let m2 = Vector.foldLeft((m, actorName) -> Map.insert(actorName, ActorAction.ActionStart, m), m1, startQuery); + + let restartQuery = query policy select (actorName) from Restart(actorName); + let m3 = Vector.foldLeft((m, actorName) -> Map.insert(actorName, ActorAction.ActionRestart, m), m2, restartQuery); + + let resumeQuery = query policy select (actorName) from Resume(actorName); + let m4 = Vector.foldLeft((m, actorName) -> Map.insert(actorName, ActorAction.ActionResume, m), m3, resumeQuery); + + // Then, we just apply the action for each actor, according to the table. + Map.foldLeftWithKey((m, actorName, action) -> { + match action { + case ActorAction.ActionStart => match Map.get(actorName, m#map) { + case Some((InternalActorState.NotStarted, behavior, _)) => + // Start the actor with a new mailbox + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + startActor(rc, m, controlSender, actorName, behavior, sender, receiver) + case Some((InternalActorState.Started(State.Stopped, _mailbox), behavior, _)) => + // Start the actor - need new receiver but reuse sender identity concept + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + startActor(rc, m, controlSender, actorName, behavior, sender, receiver) + case Some((InternalActorState.Started(State.ResumablyCrashed, _mailbox), behavior, _)) => + // Start the actor with the initial state + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + startActor(rc, m, controlSender, actorName, behavior, sender, receiver) + case Some((InternalActorState.Started(State.NonResumablyCrashed, _mailbox), behavior, _)) => + // Start the actor with the initial state + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + startActor(rc, m, controlSender, actorName, behavior, sender, receiver) + case _ => + // No-op + m + } + case ActorAction.ActionRestart => match Map.get(actorName, m#map) { + case Some((InternalActorState.Started(State.Running, mailbox), _, _)) => + // If the actor is running, stop it first + Channel.send(Message.Stop, mailbox); + // Remember that the actor has to be restarted after it successfully stopped + {restarting = actorName :: m#restarting | m} + case Some((InternalActorState.Started(State.Stopped, _mailbox), behavior, _)) => + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + startActor(rc, m, controlSender, actorName, behavior, sender, receiver) + case Some((InternalActorState.Started(State.ResumablyCrashed, _mailbox), behavior, _)) => + // Start the actor from a fresh initial state + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + startActor(rc, m, controlSender, actorName, behavior, sender, receiver) + case Some((InternalActorState.Started(State.NonResumablyCrashed, _mailbox), behavior, _)) => + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + startActor(rc, m, controlSender, actorName, behavior, sender, receiver) + case _ => + // No-op + m + } + case ActorAction.ActionResume => match Map.get(actorName, m#map) { + case Some((InternalActorState.Started(State.Stopped, _mailbox), behavior, _)) => + // Start the actor with the initial state + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + startActor(rc, m, controlSender, actorName, behavior, sender, receiver) + case Some((InternalActorState.Started(State.ResumablyCrashed, _mailbox), behavior, resumableBehavior)) => + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + startActor(rc, m, controlSender, actorName, Option.getWithDefault(behavior, resumableBehavior), sender, receiver) + case Some((InternalActorState.Started(State.NonResumablyCrashed, _mailbox), behavior, _)) => + // Start the actor with initial state + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + startActor(rc, m, controlSender, actorName, behavior, sender, receiver) + case _ => + // No-op + m + } + case ActorAction.ActionStop => match Map.get(actorName, m#map) { + case Some((InternalActorState.Started(State.Running, mailbox), _, _)) => + // Stop the actor + Channel.send(Message.Stop, mailbox); + m + case Some((InternalActorState.Started(State.ResumablyCrashed, mailbox), behavior, resumableBehavior)) => + // Change the state of the crashed actor to stopped + { + map = Map.insert(actorName, (InternalActorState.Started(State.Stopped, mailbox), behavior, resumableBehavior), m#map), + log = (actorName, State.ResumablyCrashed, State.Stopped, 0i64) :: m#log + | m + } + case Some((InternalActorState.Started(State.NonResumablyCrashed, mailbox), behavior, resumableBehavior)) => + // Change the state of the crashed actor to stopped + { + map = Map.insert(actorName, (InternalActorState.Started(State.Stopped, mailbox), behavior, resumableBehavior), m#map), + log = (actorName, State.NonResumablyCrashed, State.Stopped, 0i64) :: m#log + | m + } + case _ => + // No-op + m + } + } + }, actors, m4) + } + + /// + /// Restart all actors that are "restarting" (have been sent a Stop signal) and have successfully stopped or crashed. + /// + def startRestartingActors(rc: Region[r], actors: ActorMap, controlSender: Sender[SystemEvent]): ActorMap \ {Chan, NonDet, IO} = { + let (actors2, restartingList) = List.foldLeft((acc, actorName) -> { + let (m, notRestarted) = acc; + match Map.get(actorName, m#map) { + case Some((InternalActorState.Started(State.Stopped, _mailbox), behavior, _)) => + // The actor successfully stopped, start it again + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + (startActor(rc, m, controlSender, actorName, behavior, sender, receiver), notRestarted) + case Some((InternalActorState.Started(State.ResumablyCrashed, _mailbox), behavior, _)) => + // The actor crashed (probably when stopping), start it again + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + (startActor(rc, m, controlSender, actorName, behavior, sender, receiver), notRestarted) + case Some((InternalActorState.Started(State.NonResumablyCrashed, _mailbox), behavior, _)) => + // The actor crashed (probably when stopping), start it again + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + (startActor(rc, m, controlSender, actorName, behavior, sender, receiver), notRestarted) + case _ => + // The actor is still running, don't start it now. + (m, actorName :: notRestarted) + } + }, (actors, Nil), actors#restarting); + // Only keep actors that have not been restarted as "restarting" actors + {restarting = restartingList | actors2} + } + + /// + /// Run the current policy. Returns the updated actor map, as well as the next state of the policy. + /// + def runPolicy(rc: Region[r], s: ActorSystem, actors: ActorMap, controlSender: Sender[SystemEvent], policy: Policy): Option[(ActorMap, Policy)] \ {Chan, NonDet, IO} = { + let result = match policy { + case Policy.PolicyNotStarted => Some((actors, policy)) + case Policy.PolicyRunning => Some((runRestartPolicy(rc, s, actors, controlSender), Policy.PolicyRunning)) + case Policy.PolicyStopping(c) => Option.map(a -> (a, Policy.PolicyStopping(c)), runShutdownPolicy(rc, s, actors, controlSender, c)) + }; + // Restart the actors that have to be restarted + Option.map(res -> (startRestartingActors(rc, fst(res), controlSender), snd(res)), result) + } + + /// + /// Runs the policy and recurses. + /// + def runPolicyAndRecurse(rc: Region[r], s: ActorSystem, actors: ActorMap, controlSender: Sender[SystemEvent], controlReceiver: Receiver[SystemEvent], policy: Policy): Unit \ {Chan, NonDet, IO} = + match runPolicy(rc, s, actors, controlSender, policy) { + case Some((actors2, policy2)) => actorSystemLoop(rc, s, actors2, controlSender, controlReceiver, policy2) + case None => () + } + + /// + /// The main loop of the actor system, handling messages over the control channel. + /// + def actorSystemLoop(rc: Region[r], s: ActorSystem, actors: ActorMap, controlSender: Sender[SystemEvent], controlReceiver: Receiver[SystemEvent], policy: Policy): Unit \ {Chan, NonDet, IO} = { + let timeout = Channel.timeout(1, TimeUnit.Seconds); + select { + case event <- recv(controlReceiver) => match event { + case SystemEvent.SystemStart => + runPolicyAndRecurse(rc, s, actors, controlSender, controlReceiver, Policy.PolicyRunning) + + case SystemEvent.SystemShutdown(c) => + runPolicyAndRecurse(rc, s, actors, controlSender, controlReceiver, Policy.PolicyStopping(c)) + + case SystemEvent.SendMessage(actorName, msg, c) => + match Map.get(actorName, actors#map) { + case Some((InternalActorState.Started(State.Running, mailbox), _, _)) => + Channel.send(msg, mailbox); + Channel.send(true, c) + case Some(_) => Channel.send(false, c) // Actor not running + case _ => Channel.send(false, c) // Actor not found + }; + // We don't run the policy here because that's an external event and had no way of making the actor system progress. + actorSystemLoop(rc, s, actors, controlSender, controlReceiver, policy) + + case SystemEvent.GetActor(actorName, c) => + let mb = Option.flatMap(act -> match act { + case (InternalActorState.Started(_, mailbox), _, _) => Some(mailbox) + case _ => None + }, Map.get(actorName, actors#map)); + Channel.send(mb, c); + actorSystemLoop(rc, s, actors, controlSender, controlReceiver, policy) + + case SystemEvent.CreateActor(parent, beh, category, replySender) => + let (sender, receiver) = Channel.buffered(emptyMailboxCapacity()); + let actorName = "${category}${0i64}"; + let actors2 = startActor(rc, actors, controlSender, actorName, beh, sender, receiver); + let actors3 = {dynamicActors = (actorName, parent, category) :: actors2#dynamicActors | actors2}; + Channel.send(sender, replySender); + runPolicyAndRecurse(rc, s, actors3, controlSender, controlReceiver, policy) + + case SystemEvent.ActorReady(actorName) => + let actors2 = match Map.get(actorName, actors#map) { + case Some((InternalActorState.Started(State.InTransition, mailbox), behavior, resumableBehavior)) => + // The actor has initialized and is now running + { + map = Map.insert(actorName, (InternalActorState.Started(State.Running, mailbox), behavior, resumableBehavior), actors#map), + log = (actorName, State.InTransition, State.Running, 0i64) :: actors#log + | actors + } + case _ => + // Ignore ready message if the actor is not in transition + actors + }; + runPolicyAndRecurse(rc, s, actors2, controlSender, controlReceiver, policy) + + case SystemEvent.ActorDone(actorName) => + let actors2 = match Map.get(actorName, actors#map) { + case Some((InternalActorState.Started(State.Running, mailbox), behavior, resumableBehavior)) => + { + map = Map.insert(actorName, (InternalActorState.Started(State.Stopped, mailbox), behavior, resumableBehavior), actors#map), + log = (actorName, State.Running, State.Stopped, 0i64) :: actors#log + | actors + } + case _ => + actors + }; + runPolicyAndRecurse(rc, s, actors2, controlSender, controlReceiver, policy) + + case SystemEvent.ActorResumableCrash(actorName, resumableBehavior) => + let actors2 = match Map.get(actorName, actors#map) { + case Some((InternalActorState.Started(st, mailbox), behavior, _)) => + { + map = Map.insert(actorName, (InternalActorState.Started(State.ResumablyCrashed, mailbox), behavior, Some(resumableBehavior)), actors#map), + log = (actorName, st, State.ResumablyCrashed, 0i64) :: actors#log + | actors + } + case _ => + actors + }; + runPolicyAndRecurse(rc, s, actors2, controlSender, controlReceiver, policy) + + case SystemEvent.ActorNonResumableCrash(actorName) => + let actors2 = match Map.get(actorName, actors#map) { + case Some((InternalActorState.Started(st, mailbox), behavior, resumableBehavior)) => + { + map = Map.insert(actorName, (InternalActorState.Started(State.NonResumablyCrashed, mailbox), behavior, resumableBehavior), actors#map), + log = (actorName, st, State.NonResumablyCrashed, 0i64) :: actors#log + | actors + } + case _ => + actors + }; + runPolicyAndRecurse(rc, s, actors2, controlSender, controlReceiver, policy) + } + + case _ <- recv(timeout) => + // Run the policy every second if nothing else happened + runPolicyAndRecurse(rc, s, actors, controlSender, controlReceiver, policy) + } + } + + /// + /// Starts the given actor system `s` asynchronously and runs the given function with a handle to the running system. + /// The system is automatically shut down after the function returns. + /// + pub def withActorSystem(s: ActorSystem, f: RunningActorSystem -> a \ ef): a \ {Chan, NonDet, IO, ef} = region rc { + let (controlSender, controlReceiver) = Channel.buffered(10); + let actors = Map.foldLeftWithKey((m, actorName, behavior) -> + Map.insert(actorName, (InternalActorState.NotStarted, behavior, None), m), + Map.empty(), s#actors); + let initialActorMap: ActorMap = {map = actors, log = Nil, restarting = Nil, dynamicActors = Nil}; + spawn actorSystemLoop(rc, s, initialActorMap, controlSender, controlReceiver, Policy.PolicyNotStarted) @ rc; + Channel.send(SystemEvent.SystemStart, controlSender); + let running = RunningActorSystem.RunningActorSystem(controlSender); + let result = f(running); + shutdown(running); + result + } + + /// + /// Composes two actor systems into a single one, if they can be joined. + /// Names of actors in both systems have to be distinct. + /// Policies are joined, which might not be what you want. In that case, it is a good idea to redefine the policies after composing the systems. + /// + pub def compose(s1: ActorSystem, s2: ActorSystem): Option[ActorSystem] = + if (Map.isEmpty(Map.intersection(s1#actors, s2#actors))) { + // No shared names, we can join + Some({ + actors = Map.union(s1#actors, s2#actors), + startPolicy = s1#startPolicy <+> s2#startPolicy, + restartPolicy = s1#restartPolicy <+> s2#restartPolicy, + shutdownPolicy = s1#shutdownPolicy <+> s2#shutdownPolicy + }) + } else { + // There are some names shared so we can't join the systems + None + } + + /// + /// Initiates a shutdown of the given actor system `s` and waits for its orderly shutdown. + /// + pub def shutdown(s: RunningActorSystem): Unit \ {Chan, NonDet} = { + let RunningActorSystem.RunningActorSystem(sys) = s; + let (sender, receiver) = Channel.unbuffered(); + Channel.send(SystemEvent.SystemShutdown(sender), sys); + Channel.recv(receiver) + } +} diff --git a/src/Mailbox.flix b/src/Mailbox.flix new file mode 100644 index 0000000..d0731d1 --- /dev/null +++ b/src/Mailbox.flix @@ -0,0 +1,5 @@ +/// +/// A mailbox is the sender side of a message channel. +/// Used by external code to send messages to an actor. +/// +pub type alias Mailbox = Sender[Message] diff --git a/src/Main.flix b/src/Main.flix new file mode 100644 index 0000000..9861293 --- /dev/null +++ b/src/Main.flix @@ -0,0 +1,56 @@ +// The main entry point. +def main(): Unit \ {Chan, NonDet, IO} = { + println("=== Running HelloWorld Example ==="); + HelloWorld.runExample(); + println(""); + + println("=== Running PingPong Example ==="); + PingPong.runExample(); + println(""); + + println("=== Running Counter Example ==="); + Counter.runExample(); + println(""); + + println("=== Running Become Example ==="); + Become.runExample(); + println(""); + + println("=== Running Countdown Example ==="); + Countdown.runExample(); + println(""); + + println("=== Running LoadBalancer Example ==="); + LoadBalancer.runExample(); + println(""); + + println("=== Running Composition Example ==="); + Composition.runExample(); + println(""); + + println("=== Running Nesting Example ==="); + Nesting.runExample(); + println(""); + + println("=== Running RestartOnce Example ==="); + RestartOnce.runExample(); + println(""); + + println("=== Running RestartOncePer5Min Example ==="); + RestartOncePer5Min.runExample(); + println(""); + + println("=== Running RestartAndStop Example ==="); + RestartAndStop.runExample(); + println(""); + + println("=== Running RestartParent Example ==="); + RestartParent.runExample(); + println(""); + + println("=== Running StartupAndShutdown Example ==="); + StartupAndShutdown.runExample(); + println(""); + + println("=== All examples completed ===") +} diff --git a/src/main/actor/Message.flix b/src/Message.flix similarity index 60% rename from src/main/actor/Message.flix rename to src/Message.flix index e9f2a96..08f87c6 100644 --- a/src/main/actor/Message.flix +++ b/src/Message.flix @@ -1,15 +1,15 @@ /// /// An actor message is one of: /// -/// - a `Start` message that instructs an actor to start. The start message includes a reply channel +/// - a `Start` message that instructs an actor to start. The start message includes a reply sender /// on which the actor must send `Ready` when it is ready to start processing messages. /// /// - a `Stop` message that instructs an actor to immediately shutdown. /// -/// - A `Msg` with with an underlying message and the mailbox of the sender. +/// - A `Msg` with an underlying message and the mailbox (sender) of the sender actor. /// pub enum Message { - case Start(Channel[Ready]), - case Stop, - case Msg(Str, Mailbox) + case Start(Sender[Ready]) + case Stop + case Msg(String, Mailbox) } diff --git a/src/Ready.flix b/src/Ready.flix new file mode 100644 index 0000000..d484b81 --- /dev/null +++ b/src/Ready.flix @@ -0,0 +1,6 @@ +/// +/// A special message sent by an actor to indicate that it is ready to receive messages. +/// +pub enum Ready with Eq, ToString { + case Ready +} diff --git a/src/main/actor/State.flix b/src/State.flix similarity index 82% rename from src/main/actor/State.flix rename to src/State.flix index 26fd5c5..5eeecec 100644 --- a/src/main/actor/State.flix +++ b/src/State.flix @@ -7,10 +7,10 @@ /// - ResumablyCrashed: the actor has crashed, but may be resumed. /// - NonResumablyCrashed: the actor has crashed and cannot be resumed. /// -pub enum State { - case InTransition, // TODO: Rename to transient? - case Running, - case Stopped, - case ResumablyCrashed, +pub enum State with Eq, Order, ToString { + case InTransition + case Running + case Stopped + case ResumablyCrashed case NonResumablyCrashed } diff --git a/src/main/actor/SystemEvent.flix b/src/SystemEvent.flix similarity index 68% rename from src/main/actor/SystemEvent.flix rename to src/SystemEvent.flix index e14eb69..2f52e19 100644 --- a/src/main/actor/SystemEvent.flix +++ b/src/SystemEvent.flix @@ -9,64 +9,66 @@ pub enum SystemEvent { /// /// @param the name of the actor. /// - case ActorReady(Str), + case ActorReady(String) /// /// A message that represents that an actor has terminated normally. /// /// @param the name of the actor. /// - case ActorDone(Str), + case ActorDone(String) /// /// A message that represents that the actor has crashed, but it can still be resumed. /// /// @param the name of the actor. + /// @param the resumable behavior. /// - case ActorResumableCrash(Str, ActorBehavior), + case ActorResumableCrash(String, ActorBehavior) /// /// A message that represents that the actor has crashed and cannot be resumed. /// /// @param the name of the actor. /// - case ActorNonResumableCrash(Str), + case ActorNonResumableCrash(String) /// /// A command that the actor system should start. - /// TODO: should there be a Channel[Unit] to notify that the start has completed as well? /// - case SystemStart, + case SystemStart /// /// A command that the actor system should shutdown. /// - /// @param the channel on which the `Unit` value is sent once the shutdown has completed. + /// @param the sender on which the `Unit` value is sent once the shutdown has completed. /// - case SystemShutdown(Channel[Unit]), + case SystemShutdown(Sender[Unit]) /// /// A command to send a message to a running actor. /// /// @param name: the name of the actor /// @param msg: the message to send - /// @param c: the reply channel + /// @param c: the reply sender /// - case SendMessage(Str, Message, Channel[Bool]), + case SendMessage(String, Message, Sender[Bool]) /// /// A command to get the mailbox of an actor /// /// @param name: the name of the actor - /// @param c: the reply channel - case GetActor(Str, Channel[Option[Mailbox]]), + /// @param c: the reply sender + /// + case GetActor(String, Sender[Option[Mailbox]]) /// /// A command to dynamically create a new actor + /// /// @param parent: the name of the actor that is creating another actor /// @param beh: the behavior of the created actor /// @param category: the category of the created actor - /// @param c: the reply channel + /// @param c: the reply sender /// - case CreateActor(Str, ActorBehavior, Str, Channel[Mailbox]) + case CreateActor(String, ActorBehavior, String, Sender[Mailbox]) } diff --git a/src/example/Become.flix b/src/example/Become.flix new file mode 100644 index 0000000..a313dbf --- /dev/null +++ b/src/example/Become.flix @@ -0,0 +1,70 @@ +/// +/// An actor system that demonstrates how an actor can change its behavior (i.e. "become" another actor). +/// +mod Become { + + /// + /// The main entry point of the actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), sys -> { + // Wait for the system to start + let _ = Channel.recv(Channel.timeout(50, TimeUnit.Milliseconds)); + // Send HELO messages to trigger behavior change + discard ActorDirectory.send(sys, "greetingActor", Message.Msg("HELO", Channel.unbuffered() |> fst)); + discard ActorDirectory.send(sys, "greetingActor", Message.Msg("HELO", Channel.unbuffered() |> fst)); + discard ActorDirectory.send(sys, "greetingActor", Message.Msg("HELO", Channel.unbuffered() |> fst)); + discard ActorDirectory.send(sys, "greetingActor", Message.Msg("HELO", Channel.unbuffered() |> fst)); + discard ActorDirectory.send(sys, "greetingActor", Message.Msg("HELO", Channel.unbuffered() |> fst)); + discard ActorDirectory.send(sys, "greetingActor", Message.Msg("HELO", Channel.unbuffered() |> fst)); + // Wait for messages to be processed + let _ = Channel.recv(Channel.timeout(100, TimeUnit.Milliseconds)); + () + }) + + /// + /// Returns the actor system. + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "greetingActor" => danishGreetingActor(2) + }; + { actors = actors | empty } + } + + /// + /// An actor that prints a greeting in Danish. + /// After n greetings, it "becomes" the Swedish greeting actor. + /// + pub def danishGreetingActor(n: Int32, dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + Channel.send(Ready.Ready, c); + danishGreetingActor(n, dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("HELO", _) => + println("Hejsa!"); + if (n > 0) + danishGreetingActor(n - 1, dir, receiver, mailbox) + else + swedishGreetingActor(dir, receiver, mailbox) + case Message.Msg(msg, _) => ActorResult.ResumableCrash(danishGreetingActor(0), "Unexpected message: ${msg}") + } + + /// + /// An actor that prints a greeting in Swedish. + /// + pub def swedishGreetingActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + Channel.send(Ready.Ready, c); + swedishGreetingActor(dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("HELO", _) => + println("Hej da!"); + swedishGreetingActor(dir, receiver, mailbox) + case Message.Msg(msg, _) => ActorResult.ResumableCrash(swedishGreetingActor, "Unexpected message: ${msg}") + } + +} diff --git a/src/example/Composition.flix b/src/example/Composition.flix new file mode 100644 index 0000000..6d8d2ee --- /dev/null +++ b/src/example/Composition.flix @@ -0,0 +1,125 @@ +/// +/// An actor system that demonstrates how we can compose two actor systems into a single one. +/// +mod Composition { + + /// + /// The main entry point of the actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = { + // Compose the two systems together. + match ActorSystem.compose(countdownSystem(), counterSystem()) { + case Some(s) => + ActorSystem.withActorSystem(s, _sys -> { + // Wait for systems to run + let _ = Channel.recv(Channel.timeout(500, TimeUnit.Milliseconds)); + () + }) + case None => println("Failed to compose actor systems (name collision)") + } + } + + /// + /// Returns an actor system with one countdown actor. + /// + pub def countdownSystem(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "countdown" => countdownActor(5) + }; + { actors = actors | empty } + } + + /// + /// An actor that performs a countdown and then explodes. + /// + pub def countdownActor(n: Int32, dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Countdown] Ready..."); + Channel.send(Ready.Ready, c); + Channel.send(Message.Msg("TICK", mailbox), mailbox); + countdownActor(n, dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("TICK", _) => + if (n == 0) { + println("[Countdown] KABOOM!"); + ActorResult.NonResumableCrash("KABOOM!") + } else { + println("[Countdown] Tick-Tock (${n})"); + Channel.send(Message.Msg("TICK", mailbox), mailbox); + countdownActor(n - 1, dir, receiver, mailbox) + } + case Message.Msg(msg, _) => ActorResult.ResumableCrash(countdownActor(n), "Unexpected message: ${msg}") + } + + /// + /// Returns an actor system with one counter actor and multiple worker actors. + /// + pub def counterSystem(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "counter" => counterActor(0), + "worker1" => workerActor(None), + "worker2" => workerActor(None) + }; + let dependencies = #{ + DependsOn("worker1", "counter"). + DependsOn("worker2", "counter"). + }; + { actors = actors, startPolicy = empty#startPolicy <+> dependencies | empty } + } + + /// + /// An actor that maintains an internal counter. + /// + pub def counterActor(n: Int32, dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + Channel.send(Ready.Ready, c); + counterActor(n, dir, receiver, mailbox) + case Message.Stop => + println("[Counter] Stopped with value: ${n}"); + ActorResult.Done + case Message.Msg("GET", replyTo) => + Channel.send(Message.Msg("${n}", mailbox), replyTo); + counterActor(n, dir, receiver, mailbox) + case Message.Msg("INC", _) => + println("[Counter] Incremented to: ${n + 1}"); + if (n < 2147483647) + counterActor(n + 1, dir, receiver, mailbox) + else + ActorResult.NonResumableCrash("Counter overflowed.") + case Message.Msg("DEC", _) => + if (n == 0) + ActorResult.ResumableCrash(counterActor(n), "Counter about to become negative.") + else + counterActor(n - 1, dir, receiver, mailbox) + case Message.Msg(msg, _) => ActorResult.ResumableCrash(counterActor(n), "Unexpected message: ${msg}") + } + + /// + /// An actor that requests the counter actor to increment its number. + /// + pub def workerActor(counterMailbox: Option[Mailbox], dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + let counter = ActorDirectory.getActor(dir, "counter"); + // Send messages to ourselves to start working + Channel.send(Message.Msg("WORK", mailbox), mailbox); + Channel.send(Message.Msg("WORK", mailbox), mailbox); + Channel.send(Message.Msg("WORK", mailbox), mailbox); + Channel.send(Ready.Ready, c); + workerActor(counter, dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("WORK", _) => + match counterMailbox { + case None => ActorResult.NonResumableCrash("Where did the counterActor go?") + case Some(counter) => + Channel.send(Message.Msg("INC", mailbox), counter); + workerActor(counterMailbox, dir, receiver, mailbox) + } + case Message.Msg(msg, _) => ActorResult.ResumableCrash(workerActor(counterMailbox), "Unexpected message: ${msg}") + } + +} diff --git a/src/example/Countdown.flix b/src/example/Countdown.flix new file mode 100644 index 0000000..ceb2d98 --- /dev/null +++ b/src/example/Countdown.flix @@ -0,0 +1,52 @@ +/// +/// An actor system with a single actor that performs a countdown and then explodes. +/// The actor system can restart it for it to happen all over again. +/// +mod Countdown { + + /// + /// The main entry point of the actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), _sys -> { + // Wait for the countdown to complete + let _ = Channel.recv(Channel.timeout(500, TimeUnit.Milliseconds)); + () + }) + + /// + /// Returns an actor system with one countdown actor. + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "countdown" => countdownActor(5) + }; + { actors = actors | empty } + } + + /// + /// An actor that performs a countdown and then explodes. + /// + pub def countdownActor(n: Int32, dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("Ready..."); + Channel.send(Ready.Ready, c); + // Send a message to ourselves to start the countdown + Channel.send(Message.Msg("TICK", mailbox), mailbox); + countdownActor(n, dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("TICK", _) => + if (n == 0) { + println("KABOOM!"); + ActorResult.NonResumableCrash("KABOOM!") + } else { + println("Tick-Tock (${n})"); + Channel.send(Message.Msg("TICK", mailbox), mailbox); + countdownActor(n - 1, dir, receiver, mailbox) + } + case Message.Msg(msg, _) => ActorResult.ResumableCrash(countdownActor(n), "Unexpected message: ${msg}") + } + +} diff --git a/src/example/Counter.flix b/src/example/Counter.flix new file mode 100644 index 0000000..1f4e7a3 --- /dev/null +++ b/src/example/Counter.flix @@ -0,0 +1,93 @@ +/// +/// A simple actor system that consists of: +/// +/// - 1x counterActor that maintains an internal counter. +/// - 2x workerActors that increment the counter. +/// +mod Counter { + + /// + /// The main entry point of the actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), _sys -> { + // Wait a bit for the actors to process messages + let _ = Channel.recv(Channel.timeout(500, TimeUnit.Milliseconds)); + () + }) + + /// + /// Returns an actor system with one counter actor and multiple worker actors. + /// + /// The actor system uses empty policies with dependency declarations. + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "counter" => counterActor(0), + "worker1" => workerActor(None), + "worker2" => workerActor(None) + }; + let dependencies = #{ + DependsOn("worker1", "counter"). + DependsOn("worker2", "counter"). + }; + { actors = actors, startPolicy = empty#startPolicy <+> dependencies | empty } + } + + /// + /// An actor that maintains an internal counter. + /// + /// The counter can be incremented, decremented, and its value retrieved. + /// + /// The actor non-resumably crashes if the counter overflows. + /// The actor resumably crashes if the counter becomes negative. + /// + pub def counterActor(n: Int32, dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + Channel.send(Ready.Ready, c); + counterActor(n, dir, receiver, mailbox) + case Message.Stop => + println("Counter stopped with value: ${n}"); + ActorResult.Done + case Message.Msg("GET", replyTo) => + Channel.send(Message.Msg("${n}", mailbox), replyTo); + counterActor(n, dir, receiver, mailbox) + case Message.Msg("INC", _) => + println("Counter incremented to: ${n + 1}"); + if (n < 2147483647) + counterActor(n + 1, dir, receiver, mailbox) + else + ActorResult.NonResumableCrash("Counter overflowed.") + case Message.Msg("DEC", _) => + if (n == 0) + ActorResult.ResumableCrash(counterActor(n), "Counter about to become negative.") + else + counterActor(n - 1, dir, receiver, mailbox) + case Message.Msg(msg, _) => ActorResult.ResumableCrash(counterActor(n), "Unexpected message: ${msg}") + } + + /// + /// An actor that requests the counter actor to increment its number. + /// + pub def workerActor(counterMailbox: Option[Mailbox], dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + let counter = ActorDirectory.getActor(dir, "counter"); + // Send a message to ourselves to start working + Channel.send(Message.Msg("WORK", mailbox), mailbox); + Channel.send(Ready.Ready, c); + workerActor(counter, dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("WORK", _) => + match counterMailbox { + case None => ActorResult.NonResumableCrash("Where did the counterActor go?") + case Some(counter) => + Channel.send(Message.Msg("INC", mailbox), counter); + workerActor(counterMailbox, dir, receiver, mailbox) + } + case Message.Msg(msg, _) => ActorResult.ResumableCrash(workerActor(counterMailbox), "Unexpected message: ${msg}") + } + +} diff --git a/src/example/HelloWorld.flix b/src/example/HelloWorld.flix new file mode 100644 index 0000000..4f45daa --- /dev/null +++ b/src/example/HelloWorld.flix @@ -0,0 +1,42 @@ +/// +/// An actor system with a single actor that prints hello and goodbye. +/// +mod HelloWorld { + + /// + /// The main entry point for the single actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), _sys -> { + // Wait a bit for the actor to process messages + let _ = Channel.recv(Channel.timeout(100, TimeUnit.Milliseconds)); + () + }) + + /// + /// Returns an actor system with one hello world actor. + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "helloWorld" => helloWorld + }; + { actors = actors | empty } + } + + /// + /// An actor that prints hello world and then terminates. + /// + pub def helloWorld(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("Hello World!"); + Channel.send(Ready.Ready, c); + helloWorld(dir, receiver, mailbox) + case Message.Stop => + println("Goodbye World!"); + ActorResult.Done + case Message.Msg(msg, _) => ActorResult.ResumableCrash(helloWorld, "Unexpected message: ${msg}") + } + +} diff --git a/src/example/LoadBalancer.flix b/src/example/LoadBalancer.flix new file mode 100644 index 0000000..2211034 --- /dev/null +++ b/src/example/LoadBalancer.flix @@ -0,0 +1,132 @@ +/// +/// An actor system with an actor that sends pings to a load balancer. +/// The load balancer distributes pings to worker actors in round-robin fashion. +/// +mod LoadBalancer { + + /// + /// The main entry point for the load balancer actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), sys -> { + // Wait for initialization + let _ = Channel.recv(Channel.timeout(200, TimeUnit.Milliseconds)); + // Create a channel to receive completion notification + let (doneSender, doneReceiver) = Channel.unbuffered(); + // Start the ping actor + discard ActorDirectory.send(sys, "ping", Message.Msg("Start", doneSender)); + // Wait for completion + let _ = Channel.recv(doneReceiver); + () + }) + + /// + /// Returns an actor system with a ping actor, load balancer, and pong workers. + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "ping" => pingActor(5), + "pongMaster" => loadBalancer("pong1" :: "pong2" :: "pong3" :: Nil), + "pong1" => pongActor, + "pong2" => pongActor, + "pong3" => pongActor + }; + let dependencies = #{ + DependsOn("pongMaster", "ping"). + DependsOn("pongMaster", "pong1"). + DependsOn("pongMaster", "pong2"). + DependsOn("pongMaster", "pong3"). + }; + { actors = actors, startPolicy = empty#startPolicy <+> dependencies | empty } + } + + /// + /// An actor that sends ping messages. + /// + pub def pingActor(n: Int32, dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + Channel.send(Ready.Ready, c); + pingActor(n, dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("Start", replyTo) => + // Start ourselves by sending a Pong message + Channel.send(Message.Msg("Pong", replyTo), mailbox); + pingActor(n, dir, receiver, mailbox) + case Message.Msg("Pong", replyTo) if n == 0 => + println("pingActor is done."); + // Notify that we're done + Channel.send(Message.Msg("Done", replyTo), replyTo); + ActorResult.Done + case Message.Msg("Pong", replyTo) => + println("Received Pong!"); + match ActorDirectory.getActor(dir, "pongMaster") { + case None => pingActor(n, dir, receiver, mailbox) + case Some(lb) => + Channel.send(Message.Msg("Ping", replyTo), lb); + println("Sent Ping!"); + pingActor(n - 1, dir, receiver, mailbox) + } + case Message.Msg(msg, _) => ActorResult.ResumableCrash(pingActor(n), "Unexpected message: ${msg}") + } + + /// + /// An actor that sends pong messages when it receives ping messages. + /// + pub def pongActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + Channel.send(Ready.Ready, c); + pongActor(dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("Ping", replyTo) => + println("Received Ping!"); + match ActorDirectory.getActor(dir, "ping") { + case None => pongActor(dir, receiver, mailbox) + case Some(pingMb) => + Channel.send(Message.Msg("Pong", replyTo), pingMb); + println("Sent Pong!"); + pongActor(dir, receiver, mailbox) + } + case Message.Msg(msg, _) => ActorResult.ResumableCrash(pongActor, "Unexpected message: ${msg}") + } + + /// + /// A load balancer that distributes messages to workers in round-robin fashion. + /// + pub def loadBalancer(actorNames: List[String], dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + let workers = List.foldLeft((acc, name) -> + match ActorDirectory.getActor(dir, name) { + case Some(mb) => mb :: acc + case None => acc + }, Nil, actorNames); + Channel.send(Ready.Ready, c); + if (List.isEmpty(workers)) { + ActorResult.NonResumableCrash("Where are my workers?") + } else { + loadBalancerInitialized(workers, dir, receiver, mailbox) + } + case Message.Stop => ActorResult.Done + case Message.Msg(_, _) => ActorResult.NonResumableCrash("I haven't been initialized properly!") + } + + /// + /// The load balancer after initialization with worker references. + /// + pub def loadBalancerInitialized(workers: List[Mailbox], dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(_) => ActorResult.NonResumableCrash("Wrong initialization") + case Message.Stop => ActorResult.Done + case Message.Msg(m, replyTo) => + match workers { + case Nil => ActorResult.NonResumableCrash("Where are my workers?") + case worker :: _ => + Channel.send(Message.Msg(m, replyTo), worker); + loadBalancerInitialized(List.rotateLeft(1, workers), dir, receiver, mailbox) + } + } + +} diff --git a/src/example/Nesting.flix b/src/example/Nesting.flix new file mode 100644 index 0000000..50e2334 --- /dev/null +++ b/src/example/Nesting.flix @@ -0,0 +1,85 @@ +/// +/// An actor system that demonstrates hierarchical actor structures. +/// Each nested actor at level > 0 forwards messages to a child actor. +/// The leaf actor (level 0) responds directly. +/// +mod Nesting { + + /// + /// The main entry point of the actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), sys -> { + // Wait for initialization + let _ = Channel.recv(Channel.timeout(200, TimeUnit.Milliseconds)); + // Create a channel for the response + let (replySender, replyReceiver) = Channel.unbuffered(); + // Send a PING and wait for PONG answer + discard ActorDirectory.send(sys, "level2", Message.Msg("PING", replySender)); + match Channel.recv(replyReceiver) { + case Message.Msg("PONG", _) => println("Received PONG from nested actor!") + case _ => println("Unexpected response!") + }; + () + }) + + /// + /// Returns an actor system with nested actors at different levels. + /// level2 -> level1 -> level0 (leaf) + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "level0" => leafActor, + "level1" => forwardingActor("level0"), + "level2" => forwardingActor("level1") + }; + let dependencies = #{ + DependsOn("level1", "level0"). + DependsOn("level2", "level1"). + }; + { actors = actors, startPolicy = empty#startPolicy <+> dependencies | empty } + } + + /// + /// A leaf actor that responds to PING with PONG. + /// + pub def leafActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Nested L0] Ready (leaf)."); + Channel.send(Ready.Ready, c); + leafActor(dir, receiver, mailbox) + case Message.Stop => + println("[Nested L0] Stopped."); + ActorResult.Done + case Message.Msg("PING", replyTo) => + println("[Nested L0] Responding with PONG."); + Channel.send(Message.Msg("PONG", replyTo), replyTo); + leafActor(dir, receiver, mailbox) + case Message.Msg(msg, _) => ActorResult.ResumableCrash(leafActor, "Unexpected message: ${msg}") + } + + /// + /// A forwarding actor that passes PING messages to a child actor. + /// + pub def forwardingActor(childName: String, dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Nested] Ready, forwarding to ${childName}."); + Channel.send(Ready.Ready, c); + forwardingActor(childName, dir, receiver, mailbox) + case Message.Stop => + println("[Nested] Stopped."); + ActorResult.Done + case Message.Msg("PING", replyTo) => + println("[Nested] Forwarding PING to ${childName}."); + match ActorDirectory.getActor(dir, childName) { + case Some(child) => Channel.send(Message.Msg("PING", replyTo), child) + case None => println("[Nested] Child ${childName} not found!") + }; + forwardingActor(childName, dir, receiver, mailbox) + case Message.Msg(msg, _) => ActorResult.ResumableCrash(forwardingActor(childName), "Unexpected message: ${msg}") + } + +} diff --git a/src/example/PingPong.flix b/src/example/PingPong.flix new file mode 100644 index 0000000..021e936 --- /dev/null +++ b/src/example/PingPong.flix @@ -0,0 +1,78 @@ +/// +/// An actor system with two actors that sends pings and pongs. +/// +mod PingPong { + + /// + /// The main entry point for the single actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), _sys -> { + // Wait a bit for the actors to process messages + let _ = Channel.recv(Channel.timeout(500, TimeUnit.Milliseconds)); + () + }) + + /// + /// Returns an actor system with a ping actor and a pong actor. + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "ping" => pingActor(2), + "pong" => pongActor(2) + }; + { actors = actors | empty } + } + + /// + /// An actor that sends ping messages. + /// + pub def pingActor(n: Int32, dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + Channel.send(Ready.Ready, c); + // Send a message to ourselves to trigger the first ping + Channel.send(Message.Msg("Pong", mailbox), mailbox); + pingActor(n, dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("Pong", _) if n == 0 => + println("pingActor is done."); + ActorResult.Done + case Message.Msg("Pong", _) => + println("Received Pong!"); + match ActorDirectory.getActor(dir, "pong") { + case None => pingActor(n, dir, receiver, mailbox) + case Some(pongMailbox) => + Channel.send(Message.Msg("Ping", mailbox), pongMailbox); + println("Sent Ping!"); + pingActor(n - 1, dir, receiver, mailbox) + } + case Message.Msg(msg, _) => ActorResult.ResumableCrash(pingActor(n), "Unexpected message: ${msg}") + } + + /// + /// An actor that sends pong messages. + /// + pub def pongActor(n: Int32, dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + Channel.send(Ready.Ready, c); + pongActor(n, dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("Ping", _) if n == 0 => + println("pongActor is done."); + ActorResult.Done + case Message.Msg("Ping", _) => + println("Received Ping!"); + match ActorDirectory.getActor(dir, "ping") { + case None => pongActor(n, dir, receiver, mailbox) + case Some(pingMailbox) => + Channel.send(Message.Msg("Pong", mailbox), pingMailbox); + println("Sent Pong!"); + pongActor(n - 1, dir, receiver, mailbox) + } + case Message.Msg(msg, _) => ActorResult.ResumableCrash(pongActor(n), "Unexpected message: ${msg}") + } + +} diff --git a/src/example/RestartAndStop.flix b/src/example/RestartAndStop.flix new file mode 100644 index 0000000..d5a3933 --- /dev/null +++ b/src/example/RestartAndStop.flix @@ -0,0 +1,108 @@ +/// +/// An actor system with a master-worker model. +/// +/// If a worker actor crashes *once* it is restarted. If it crashes again, everything is stopped. +/// +mod RestartAndStop { + + /// + /// The main entry point of the actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), _sys -> { + // Wait for the system to run + let _ = Channel.recv(Channel.timeout(500, TimeUnit.Milliseconds)); + () + }) + + /// + /// Returns an actor system with a master and multiple worker actors. + /// Workers restart once on crash, but crash twice triggers full stop. + /// + /// State codes for Log: Running=1, NonResumablyCrashed=4 + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "master" => masterActor(Nil), + "worker1" => workerActor, + "worker2" => workerActor, + "worker3" => workerActor + }; + let restartPolicy = #{ + // Compute the actors that have non-resumably crashed more than once. + // Log uses Int32 state codes: Running=1, NonResumablyCrashed=4 + Blocked(x) :- + Log(x, 1, 4, timestamp1), + Log(x, 1, 4, timestamp2), + if (timestamp1 != timestamp2). + + // Restart non-resumably crashed actors that haven't crashed twice + Start(x) :- NonResumablyCrashed(x), not Blocked(x). + + // If any worker has crashed twice, stop all actors + Stop(x) :- Actor(x), Blocked("worker1"). + Stop(x) :- Actor(x), Blocked("worker2"). + Stop(x) :- Actor(x), Blocked("worker3"). + }; + { actors = actors, restartPolicy = restartPolicy | empty } + } + + /// + /// A master actor that communicates with its workers. + /// + pub def masterActor(workers: List[Mailbox], dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Master] Ready"); + // Look up worker mailboxes + let w1 = ActorDirectory.getActor(dir, "worker1"); + let w2 = ActorDirectory.getActor(dir, "worker2"); + let w3 = ActorDirectory.getActor(dir, "worker3"); + let workerList = List.filterMap(x -> x, w1 :: w2 :: w3 :: Nil); + // Start work loop + Channel.send(Message.Msg("GO", mailbox), mailbox); + Channel.send(Ready.Ready, c); + masterActor(workerList, dir, receiver, mailbox) + case Message.Stop => + println("[Master] Stopped."); + ActorResult.Done + case Message.Msg("GO", _) => + // Send work or crash message to workers + List.forEachWithIndex((i, worker) -> { + if (i == 0) { + // First worker will crash + Channel.send(Message.Msg("KABOOM", mailbox), worker) + } else { + Channel.send(Message.Msg("WORK", mailbox), worker) + } + }, workers); + // Wait a bit then send another round + let _ = Channel.recv(Channel.timeout(50, TimeUnit.Milliseconds)); + Channel.send(Message.Msg("GO", mailbox), mailbox); + masterActor(workers, dir, receiver, mailbox) + case Message.Msg(msg, _) => ActorResult.ResumableCrash(masterActor(workers), "Unexpected message: ${msg}") + } + + /// + /// A worker actor that can work or explode. + /// + pub def workerActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Worker] Ready."); + Channel.send(Ready.Ready, c); + workerActor(dir, receiver, mailbox) + case Message.Stop => + println("[Worker] Stopped."); + ActorResult.Done + case Message.Msg("WORK", _) => + println("[Worker] Working."); + workerActor(dir, receiver, mailbox) + case Message.Msg("KABOOM", _) => + println("[Worker] Kaboom!"); + ActorResult.NonResumableCrash("WHAM!") + case Message.Msg(msg, _) => ActorResult.ResumableCrash(workerActor, "Unexpected message: ${msg}") + } + +} diff --git a/src/example/RestartOnce.flix b/src/example/RestartOnce.flix new file mode 100644 index 0000000..215a3b6 --- /dev/null +++ b/src/example/RestartOnce.flix @@ -0,0 +1,60 @@ +/// +/// An actor system with an actor that crashes and is allowed to restart once. +/// If it crashes a second time, it stays crashed (blocked from restarting). +/// +mod RestartOnce { + + /// + /// The main entry point of the actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), _sys -> { + // Wait for the actor to crash, restart, and crash again + let _ = Channel.recv(Channel.timeout(500, TimeUnit.Milliseconds)); + () + }) + + /// + /// Returns an actor system with one unstable actor and a restart policy + /// that only allows one restart. + /// + /// State codes for Log: Running=1, NonResumablyCrashed=4 + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "unstable" => unstableActor + }; + let restartPolicy = #{ + // Compute the actors that have non-resumably crashed more than once. + // Log uses Int32 state codes: Running=1, NonResumablyCrashed=4 + Blocked(x) :- + Log(x, 1, 4, time1), + Log(x, 1, 4, time2), + if (time1 != time2). + + // Restart non-resumably crashed actors that haven't crashed twice + Start(x) :- NonResumablyCrashed(x), not Blocked(x). + }; + { actors = actors, restartPolicy = restartPolicy | empty } + } + + /// + /// An actor that explodes immediately after starting. + /// + pub def unstableActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Unstable] Ready..."); + Channel.send(Ready.Ready, c); + // Immediately send ourselves a message to crash + Channel.send(Message.Msg("KABOOM", mailbox), mailbox); + unstableActor(dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("KABOOM", _) => + println("[Unstable] Kaboom!"); + ActorResult.NonResumableCrash("WHAM!") + case Message.Msg(msg, _) => ActorResult.ResumableCrash(unstableActor, "Unexpected message: ${msg}") + } + +} diff --git a/src/example/RestartOncePer5Min.flix b/src/example/RestartOncePer5Min.flix new file mode 100644 index 0000000..4ca813d --- /dev/null +++ b/src/example/RestartOncePer5Min.flix @@ -0,0 +1,64 @@ +/// +/// An actor system with an actor that crashes and is allowed to restart +/// only if it hasn't crashed recently (within the last second for demo purposes). +/// +mod RestartOncePer5Min { + + /// + /// The main entry point of the actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), _sys -> { + // Wait for the actor to crash and potentially restart + let _ = Channel.recv(Channel.timeout(500, TimeUnit.Milliseconds)); + () + }) + + /// + /// Returns an actor system with one unstable actor and a restart policy + /// that only allows restarts if the actor hasn't crashed in the last second. + /// + /// State codes for Log: Running=1, NonResumablyCrashed=4 + /// Clock provides the current timestamp + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "unstable" => unstableActor + }; + let restartPolicy = #{ + // Compute the actors that have non-resumably crashed in the last second. + // In the original this was 5 minutes, but for demo purposes we use 1 second. + // Log uses Int32 state codes: Running=1, NonResumablyCrashed=4 + // Clock(now) gives the current timestamp + // Check if the crash was within 1000ms (1 second) + Blocked(x) :- + Log(x, 1, 4, timestamp), + Clock(now), + if ((now - timestamp) < 1000i64). + + // Restart non-resumably crashed actors that haven't crashed recently + Start(x) :- NonResumablyCrashed(x), not Blocked(x). + }; + { actors = actors, restartPolicy = restartPolicy | empty } + } + + /// + /// An actor that explodes immediately after starting. + /// + pub def unstableActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Unstable] Ready..."); + Channel.send(Ready.Ready, c); + // Immediately send ourselves a message to crash + Channel.send(Message.Msg("KABOOM", mailbox), mailbox); + unstableActor(dir, receiver, mailbox) + case Message.Stop => ActorResult.Done + case Message.Msg("KABOOM", _) => + println("[Unstable] Kaboom!"); + ActorResult.NonResumableCrash("WHAM!") + case Message.Msg(msg, _) => ActorResult.ResumableCrash(unstableActor, "Unexpected message: ${msg}") + } + +} diff --git a/src/example/RestartParent.flix b/src/example/RestartParent.flix new file mode 100644 index 0000000..6b1ac34 --- /dev/null +++ b/src/example/RestartParent.flix @@ -0,0 +1,86 @@ +/// +/// An actor system with a master-worker model where a crash in a worker +/// restarts both the worker and the master (parent supervision pattern). +/// +mod RestartParent { + + /// + /// The main entry point of the actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), sys -> { + // Wait for initialization + let _ = Channel.recv(Channel.timeout(50, TimeUnit.Milliseconds)); + // Trigger a crash in worker2 + discard ActorDirectory.send(sys, "worker2", Message.Msg("OHNO", Channel.unbuffered() |> fst)); + // Wait for restart cycle + let _ = Channel.recv(Channel.timeout(200, TimeUnit.Milliseconds)); + () + }) + + /// + /// Returns the actor system with master-worker supervision. + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "master" => masterActor, + "worker1" => workerActor, + "worker2" => workerActor, + "worker3" => workerActor + }; + let dependencies = #{ + DependsOn("worker1", "master"). + DependsOn("worker2", "master"). + DependsOn("worker3", "master"). + }; + let restartPolicy = #{ + // Start a non-resumably crashed actor. + Start(x) :- NonResumablyCrashed(x). + + // Restart the master if one of its workers crashes. + Restart("master") :- NonResumablyCrashed("worker1"). + Restart("master") :- NonResumablyCrashed("worker2"). + Restart("master") :- NonResumablyCrashed("worker3"). + }; + { actors = actors, + startPolicy = ActorPolicy.emptyStartPolicy() <+> dependencies, + shutdownPolicy = ActorPolicy.emptyShutdownPolicy() <+> dependencies, + restartPolicy = restartPolicy + | empty } + } + + /// + /// A master actor. + /// + pub def masterActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Master] Ready."); + Channel.send(Ready.Ready, c); + masterActor(dir, receiver, mailbox) + case Message.Stop => + println("[Master] Stopped."); + ActorResult.Done + case Message.Msg(msg, _) => ActorResult.ResumableCrash(masterActor, "Unexpected message: ${msg}") + } + + /// + /// A worker actor. + /// + pub def workerActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Worker] Ready."); + Channel.send(Ready.Ready, c); + workerActor(dir, receiver, mailbox) + case Message.Stop => + println("[Worker] Stopped."); + ActorResult.Done + case Message.Msg("OHNO", _) => + println("[Worker] Crashed."); + ActorResult.NonResumableCrash("WHAM!") + case Message.Msg(msg, _) => ActorResult.ResumableCrash(workerActor, "Unexpected message: ${msg}") + } + +} diff --git a/src/example/StartupAndShutdown.flix b/src/example/StartupAndShutdown.flix new file mode 100644 index 0000000..8d44414 --- /dev/null +++ b/src/example/StartupAndShutdown.flix @@ -0,0 +1,131 @@ +/// +/// An example that demonstrates how actors can be started and stopped in an orderly fashion +/// based on their dependency relationships. +/// +mod StartupAndShutdown { + + /// + /// The main entry point of the actor system. + /// + pub def runExample(): Unit \ {Chan, NonDet, IO} = + ActorSystem.withActorSystem(system(), _sys -> { + // Wait for all actors to start in order + let _ = Channel.recv(Channel.timeout(200, TimeUnit.Milliseconds)); + () + }) + + /// + /// Returns the actor system with complex dependencies. + /// Actors are started in dependency order (leaf dependencies first). + /// Actors are stopped in reverse dependency order (dependents first). + /// + pub def system(): ActorSystem = { + let empty = ActorSystem.empty(); + let actors = Map#{ + "authenticationActor" => authenticationActor, + "databaseActor" => databaseActor, + "loadBalanceActor" => loadBalanceActor, + "webworker1" => webWorkerActor, + "webworker2" => webWorkerActor, + "webworker3" => webWorkerActor, + "loggingActor" => loggingActor + }; + // The actors are started and stopped according to their dependencies. + // Start order: loggingActor -> databaseActor -> authenticationActor -> loadBalanceActor -> webworkers + // Stop order: webworkers -> loadBalanceActor -> authenticationActor -> databaseActor -> loggingActor + let dependencies = #{ + DependsOn("databaseActor", "loggingActor"). + + DependsOn("authenticationActor", "databaseActor"). + DependsOn("authenticationActor", "loggingActor"). + + DependsOn("loadBalanceActor", "authenticationActor"). + DependsOn("loadBalanceActor", "databaseActor"). + DependsOn("loadBalanceActor", "loggingActor"). + + DependsOn("webworker1", "loadBalanceActor"). + DependsOn("webworker2", "loadBalanceActor"). + DependsOn("webworker3", "loadBalanceActor"). + }; + { actors = actors, + startPolicy = ActorPolicy.emptyStartPolicy() <+> dependencies, + shutdownPolicy = ActorPolicy.emptyShutdownPolicy() <+> dependencies + | empty } + } + + /// + /// An authentication actor. + /// + pub def authenticationActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Authentication] Ready."); + Channel.send(Ready.Ready, c); + authenticationActor(dir, receiver, mailbox) + case Message.Stop => + println("[Authentication] Stopped."); + ActorResult.Done + case Message.Msg(msg, _) => ActorResult.ResumableCrash(authenticationActor, "Unexpected message: ${msg}") + } + + /// + /// A database actor. + /// + pub def databaseActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Database] Ready."); + Channel.send(Ready.Ready, c); + databaseActor(dir, receiver, mailbox) + case Message.Stop => + println("[Database] Stopped."); + ActorResult.Done + case Message.Msg(msg, _) => ActorResult.ResumableCrash(databaseActor, "Unexpected message: ${msg}") + } + + /// + /// A load balance actor. + /// + pub def loadBalanceActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[LoadBalance] Ready."); + Channel.send(Ready.Ready, c); + loadBalanceActor(dir, receiver, mailbox) + case Message.Stop => + println("[LoadBalance] Stopped."); + ActorResult.Done + case Message.Msg(msg, _) => ActorResult.ResumableCrash(loadBalanceActor, "Unexpected message: ${msg}") + } + + /// + /// A web worker actor. + /// + pub def webWorkerActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[WebWorker] Ready."); + Channel.send(Ready.Ready, c); + webWorkerActor(dir, receiver, mailbox) + case Message.Stop => + println("[WebWorker] Stopped."); + ActorResult.Done + case Message.Msg(msg, _) => ActorResult.ResumableCrash(webWorkerActor, "Unexpected message: ${msg}") + } + + /// + /// A logging actor. + /// + pub def loggingActor(dir: ActorDirectory, receiver: Receiver[Message], mailbox: Mailbox): ActorResult[String] \ {Chan, NonDet, IO} = + match Channel.recv(receiver) { + case Message.Start(c) => + println("[Logging] Ready."); + Channel.send(Ready.Ready, c); + loggingActor(dir, receiver, mailbox) + case Message.Stop => + println("[Logging] Stopped."); + ActorResult.Done + case Message.Msg(msg, _) => ActorResult.ResumableCrash(loggingActor, "Unexpected message: ${msg}") + } + +} diff --git a/src/main/actor/ActorBehavior.flix b/src/main/actor/ActorBehavior.flix deleted file mode 100644 index 9b0eae3..0000000 --- a/src/main/actor/ActorBehavior.flix +++ /dev/null @@ -1,4 +0,0 @@ -/// -/// An actor behavior is a function from a mailbox to an actor result. -/// -type alias ActorBehavior = ActorDirectory -> Mailbox -> ActorResult[Str] diff --git a/src/main/actor/ActorDirectory.flix b/src/main/actor/ActorDirectory.flix deleted file mode 100644 index d267409..0000000 --- a/src/main/actor/ActorDirectory.flix +++ /dev/null @@ -1,38 +0,0 @@ -/// -/// An actor directory, used to retrieve mailboxes of actors in the system -/// -type alias ActorDirectory = RunningActorSystem - -// TODO: do we need to decouple that from ActorSystem? They could just be merged. -namespace ActorDirectory { - /// - /// Returns an actor mailbox (blocking function) - /// - pub def getActor(s: ActorDirectory, name: Str): Option[Mailbox] & Impure = { - let RunningActorSystem(sys) = s; - let c = chan Option[Mailbox] 0; - sys <- GetActor(name, c); - <- c - } - - /// - /// Sends a message to an actor, given its name - /// - pub def send(s: RunningActorSystem, actorName: Str, msg: Message): Bool & Impure = { - let RunningActorSystem(sys) = s; - let c = chan Bool 0; - sys <- SendMessage(actorName, msg, c); - <- c - } - - /// - /// Dynamically create a new actor, given its parent name, its behavior, and category - /// - pub def create(s: RunningActorSystem, parent: Str, behavior: ActorBehavior, category: Str): Mailbox & Impure = { - let RunningActorSystem(sys) = s; - let c = chan Mailbox 0; - sys <- CreateActor(parent, behavior, category, c); - <- c - } -} - diff --git a/src/main/actor/ActorPolicy.flix b/src/main/actor/ActorPolicy.flix deleted file mode 100644 index 33380c0..0000000 --- a/src/main/actor/ActorPolicy.flix +++ /dev/null @@ -1,191 +0,0 @@ -/// -/// A type alias for the row type of an actor policy. -/// -type alias ActorPolicy = #{Actor, ActorState, DependsOn, Start, Stop, Resume, Restart, Log, Clock, Blocked, Sibling, Created, Category, ActorsNotStopped()} - -// TODO: Could we have the start policy only using the Start predicate. And the stop policy NOT using the start predicate. -// TODO: But we do need the join for the "restartPolicy" (which maybe we should rename to "runningPolicy") -// TODO: The whole actor system progresses through start -> running -> stop - - -/// -/// A relation that represents actors in the system. -/// -pub rel Actor(name: Str) - -/// -/// A relation that relates two actors with the same parent -/// -pub rel Sibling(name1: Str, name2: Str) - -/// -/// A relation that represents the parent-child relationship between dynamic actors -/// -pub rel Created(parent: Str, child: Str) - -/// -/// A relation to categorize actors. -/// -pub rel Category(name: Str, category: Str) - -/// -/// A relation that represents the dependencies between actors. -/// -/// If an actor `x` depends on an actor `y` then `x` is started before `y`. -/// -/// If there is a cyclic dependency then both actors are started at the same time. -/// -pub rel DependsOn(name1: Str, name2: Str) - -/// -/// A relation used to specify the actors that should be started by an actor policy. -/// -pub rel Start(name: Str) - -/// -/// A relation used to specify the actors that should be stopped by an actor policy. -/// -pub rel Stop(name: Str) - -/// -/// A relation used to specify the actors that should be restarted (with a fresh state) by an actor policy. -pub rel Restart(name: Str) - -/// -/// A relation used to specify the actors that should be resume (from their last known good state) by an actor policy. -/// -pub rel Resume(name: Str) - -/// -/// A relation that tracks past actor transitions. -/// -pub rel Log(name: Str, last: State, next: State, timestamp: Instant) // TODO: Maybe it would be useful if we extend this with a "nanoseconds since event" for easy filtering. - -/// -/// A relation that holds the current time. -/// -pub rel Clock(timestamp: Instant) - -/// -/// If an actor policy specifies multiple actions for the same actor then the final action is computed by join: -/// -/// Start, Start => Start -/// Start, Stop => Start -/// Stop, Start => Start -/// Resume, Stop => Resume -/// Stop, Resume => Resume -/// Start, Resume => Resume -/// Resume, Start => Resume -/// x, x => x -/// - -/// -/// An action is combined with an actor state according to the following table: -/// -/// <: -/// - Start => start the actor. -/// - Stop => no-op. -/// - Resume => no-op. -/// InTransition (actor is changing its state): -/// - Start => no-op. -/// - Stop => no-op. -/// - Resume => no-op. -/// Running: -/// - Start => no-op. -/// - Stop => stop the actor. -/// - Resume => no-op. -/// Stopped: -/// - Start => start the actor. -/// - Stop => no-op. -/// - Resume => start the actor (with the initial state). -/// ResumablyCrashed: -/// - Start => restart the actor with the initial state. (required to allow a reset of the actor) -/// - Stop => change state to stopped (clears the error) (required to allow the error to be cleared). -/// - Resume => resume. -/// NonResumablyCrashed: -/// - Start => start the actor with initial state. -/// - Stop => change state to stopped (clears the error). -/// - Resume => start the actor with initial state. -/// - -/// -/// A predicate that describes the current state of an actor. -/// -/// @param name the name of the actor. -/// @param state the state of the actor. An actor has exactly one state. -/// -pub rel ActorState(name: Str, state: State) - -/// -/// Auxiliary relation: An actor is blocked if one or more of its (transitive) dependencies is not yet running. -/// -rel Blocked(name: Str) // TODO: Rename to wait? - -/// -/// Returns the empty start policy. -/// -pub def emptyStartPolicy(): ActorPolicy = #{ - // Compute the transitive closure of the DependsOn relation. - DependsOn(x, z) :- DependsOn(x, y), DependsOn(y, z). - - // Compute the actors that are waiting for one or more of its dependencies to be running. - // Note1: An actor is *not* blocked if it depends on itself. This allows actors in cyclic dependencies to be started. - // Note2: An actor that has no dependencies is trivially *never* blocked. - Blocked(x) :- DependsOn(x, y), not ActorState(y, Running), not DependsOn(x, x). - - // Start actors that are not blocked. - Start(x) :- Actor(x), not Blocked(x). -} - -/// -/// Returns a policy that immediately starts all actors regardless of their dependencies. -/// -pub def immediatelyStartAllPolicy(): ActorPolicy = #{ - Start(actor) :- Actor(actor). -} - -/// -/// Returns the empty shutdown policy. -/// -pub def emptyShutdownPolicy(): ActorPolicy = #{ - // Compute the transitive closure of the DependsOn relation. - DependsOn(x, z) :- DependsOn(x, y), DependsOn(y, z). - - // Compute the actors that are waiting for one or more if its dependencies to be stopped. - // Note1: An actor is *not* blocked if it depends on itself. This allows actors in cyclic dependencies to be stopped. - // Note2: An actor that no-one depends on is trivially *never* blocked. - Blocked(x) :- DependsOn(y, x), ActorState(y, Running), not DependsOn(x, x). - - // Stop actors that are not blocked. - Stop(x) :- Actor(x), ActorState(x, Running), not Blocked(x). -} - -/// -/// Returns a policy that immediately shuts down all actors regardless of their dependencies. -/// -pub def immediatelyShutdownAllPolicy(): ActorPolicy = #{ - Stop(actor) :- ActorState(actor, Running). -} - -/// -/// Returns the empty restart policy. -/// -pub def emptyRestartPolicy(): ActorPolicy = oneForOneRestartPolicy() - -/// -/// Returns a restart policy that restarts an actor when it crashes. -/// -pub def oneForOneRestartPolicy(): ActorPolicy = #{ - Resume(x) :- ActorState(x, ResumablyCrashed). - Start(x) :- ActorState(x, NonResumablyCrashed). -} - -/// -/// Returns a restart policy that restarts an actor along with its siblings when it crashes. -/// -pub def oneForAllRestartPolicy(): ActorPolicy = oneForOneRestartPolicy() <+> #{ - Start(x) :- Sibling(x, y), ActorState(y, ResumablyCrashed). - Start(x) :- Sibling(x, y), ActorState(y, NonResumablyCrashed). -} - -// TODO: Magnus: Implement more policies. \ No newline at end of file diff --git a/src/main/actor/ActorSystem.flix b/src/main/actor/ActorSystem.flix deleted file mode 100644 index 100b41c..0000000 --- a/src/main/actor/ActorSystem.flix +++ /dev/null @@ -1,479 +0,0 @@ -/// -/// An actor system is a type alias for a record that consists of four fields: -/// -/// @field `actors` that declares the actors in the system. -/// @field `startPolicy` that describes the policy for how actors are started. -/// @field `restartPolicy` that describes the policy for how actors are restarted if they crash. -/// @field `shutdownPolicy` that describes the policy for how actors are stopped. -/// -type alias ActorSystem = { - actors: Map[Str, ActorBehavior], - startPolicy: ActorPolicy, - restartPolicy: ActorPolicy, - shutdownPolicy: ActorPolicy -} - -/// -/// A running actor system is a type alias for a channel of actor events. -/// -opaque type RunningActorSystem = Channel[SystemEvent] - -/// -/// An actor is identified by its mailbox -/// -type alias ActorRef = Mailbox // TODO: Quentin: Maybe a name and a mailbox? - -namespace ActorSystem { - - /// - /// Returns an actor system with a empty configuration. - /// - pub def empty(): ActorSystem = { - actors = Map#{}, - startPolicy = emptyStartPolicy(), - shutdownPolicy = emptyShutdownPolicy(), - restartPolicy = emptyRestartPolicy() - } - - /// - /// Returns the empty mailbox capacity. - /// - pub def emptyMailboxCapacity(): Int = 1000 * 1000 - - enum InternalActorState { - /// - /// An actor that has not been started previously, and has no mailbox - /// - case NotStarted, - /// - /// An actor that has been started and been assigned a mailbox. The actor could be stopped or crashed now. - /// - case Started(State, ActorRef) - } - - type alias ActorMap = { - // The actors running - map: Map[Str, (InternalActorState, ActorBehavior, Option[ActorBehavior])], - // The log of all events that happened - log: List[(Str, State, State, Instant)], - // The actors that are currently restarting - restarting: List[Str], - // Actors that have been dynamically created, with their parent and category - dynamicActors: List[(Str, Str, Str)] - } - - /// - /// Return the current state of the actors in the system, using the ActorState, Log, and Actor relations. - /// - def statesOfActors(s: ActorSystem, actors: ActorMap): ActorPolicy & Impure = { - // Builds the ActorState relation - let statesRel = Map.foldLeftWithKey((states, actorName, actor) -> - let (internalState, _, _) = actor; - match internalState { - case NotStarted => states <+> #{ActorState(actorName, Stopped).} - case Started(st, _) => states <+> #{ActorState(actorName, st).} - }, #{}, actors.map); - // Builds the Log relation - let logRel = List.foldLeft((log, data) -> - let (actorName, stateBefore, stateAfter, instant) = data; - log <+> #{ Log(actorName, stateBefore, stateAfter, instant). }, - #{}, actors.log); - // Builds the Actor relation - let actorsRel = Set.foldLeft((rel, actor: Str) -> { - rel <+> #{Actor(actor).} }, - #{}, Map.keysOf(s.actors)); - // Builds the Created and Category relations, and fills the Actor relation with dynamic actors - let createdRel = List.foldLeft((rel, dyn) -> - let (name, parent, category) = dyn; - rel <+> #{ Actor(name). Created(parent, name). Category(name, category). }, - #{}, actors.dynamicActors); - // Add the rule for the Sibling relation - let sibling = #{Sibling(x, y) :- Actor(parent), Created(parent, x), Created(parent, y).}; - let rightNow = Instant.now(); - statesRel <+> logRel <+> actorsRel <+> createdRel <+> sibling <+> #{ Clock(rightNow). } - } - - rel ActorsNotStopped() - /// - /// Checks that all the actors are in the Stopped state - /// - def allActorsStopped(state: ActorPolicy): Bool = - !((solve (state <+> #{ - ActorsNotStopped() :- ActorState(_, InTransition). - ActorsNotStopped() :- ActorState(_, Running). - // Crashed actors are considered stopped - })) |= ActorsNotStopped().) - - /// - /// Starts an actor given its name, behavior, and mailbox. Returns the updated map of running actors - /// - def startActor(actors: ActorMap, control: Channel[SystemEvent], actorName: Str, behavior: ActorBehavior, mailbox: Channel[Message]): ActorMap & Impure = { - // Create the process of the new actor - spawn ({ - match behavior(RunningActorSystem(control), mailbox) { - case Done => - // Notify on the control channel that the actor is done - control <- ActorDone(actorName) - case ResumableCrash(resumableBehavior, _) => - // Notify that the actor has crashed - control <- ActorResumableCrash(actorName, resumableBehavior) - case NonResumableCrash(_) => - // Notify that the actor has crashed - control <- ActorNonResumableCrash(actorName) - } - }); - // The channel on which the actor will notify us that is has successfully initialized - let readyChan = chan Ready 0; - // Tell the actor to initialize - mailbox <- Start(readyChan); - spawn ({ - // Wait (on a different process) for the actor to fully initialize - // TODO: what if the actor crashes before being ready? Should we use a timeout? For now we assume it can't crash. - <- readyChan; - // Then notify the actor system - control <- ActorReady(actorName) - }); - // Add the actor to the map of running actors. It is now in transition. - { map = Map.insert(actorName, (Started(InTransition, mailbox), behavior, None), actors.map), - log = match Map.get(actorName, actors.map) { - case Some((Started(st, _), _, _)) => (actorName, st, InTransition, Instant.now()) - case _ => (actorName, Stopped, InTransition, Instant.now()) - } :: actors.log - | actors } - } - - /// - /// Runs the shutdown policy of an actor system. - /// Returns the updated actor map after one iteration of the policy, or None if the system should stop. - /// - def runShutdownPolicy(s: ActorSystem, actors: ActorMap, _control: Channel[SystemEvent], notifyChan: Channel[Unit]): Option[ActorMap] & Impure = { - Console.printLine("[system] shutdown policy"); - let currentState = statesOfActors(s, actors); - let (actors2, n) = fold Stop (actors, 0) ((actorName, acc) -> // TODO: Fold requires a pure function and this is not pure, hence all the casts. - let (m, n) = acc; // m is the number of actors (really) stopped in this iteration - // Console.printLine("[system] have to stop actor: " + actorName); - - match Map.get(actorName, m.map) { - case Some((Started(Running, mailbox), _, _)) => { - Console.printLine("!!!!! [system] sending stop to " + actorName); - // Running, Stop => stop the actor - mailbox <- Stop; // actor will terminate and will notify the actor system - (m, n+1) - } as & Pure - - case Some((Started(ResumablyCrashed, mailbox), behavior, resumableBehavior)) => { - // ResumablyCrashed, Stop => change state to stopped - Console.printLine("[system] setting crashed1 to stop: " + actorName); - ({ map = Map.insert(actorName, (Started(Stopped, mailbox), behavior, resumableBehavior), m.map), - log = (actorName, ResumablyCrashed, Stopped, Instant.now()) :: m.log - | m}, n) - } as & Pure - - case Some((Started(NonResumablyCrashed, mailbox), behavior, resumableBehavior)) => { - // NonResumablyCrashed, Stop => change state to stopped - Console.printLine("[system] setting crashed2 to stop: " + actorName); - ({ map = Map.insert(actorName, (Started(Stopped, mailbox), behavior, resumableBehavior), m.map), - log = (actorName, NonResumablyCrashed, Stopped, Instant.now()) :: m.log - | m}, n) - } as & Pure - - case _ => { - Console.printLine("[system] non-case for : " + actorName); - // Other cases are no-ops - (m, n) - } as & Pure - }) (solve (s.restartPolicy<+> s.shutdownPolicy <+> currentState)); - if (n == 0 && allActorsStopped(currentState)) { - Console.printLine("[system] fully stopped"); - // System fully stopped - notifyChan <- (); // Notify the requester of the shutdown - None - } else { - Console.printLine("[system] still some actors to terminate"); - // Some actors still have to be stopped - Some(actors2) - } - } - - /// If an actor policy specifies multiple actions for the same actor then the final action is computed by join (which is symmetric) - /// - /// Start, Stop => Start - /// Restart, Stop => Restart - /// Restart, Start => Restart - /// Stop, Resume => Resume - /// Restart, Resume => Resume - /// Start, Resume => Resume - /// x, x => x - /// - - enum ActorAction { - case ActionResume, - case ActionStart, - case ActionRestart, - case ActionStop - } - def runRestartPolicy(s: ActorSystem, actors: ActorMap, control: Channel[SystemEvent]): ActorMap & Impure = { - let currentState = statesOfActors(s, actors); - // We just accumulate for each actor, whether it should 1. Stop, 2. Start, 3. Resume, overriding any previous value. This corresponds to the increasing order of the sop/start/resume lattice. - let m0: Map[Str, ActorAction] = Map#{}; - let policy = solve (s.startPolicy <+> s.restartPolicy <+> currentState); - // The order of the following folds are important: the later folds will take precedence over the previous ones in case multiple state changes for the same actor are requested by the policy. - let m1 = fold Stop m0 ((actorName, m) -> Map.insert(actorName, ActionStop, m)) policy; - let m2 = fold Start m1 ((actorName, m) -> Map.insert(actorName, ActionStart, m)) policy; - let m3 = fold Restart m2 ((actorName, m) -> Map.insert(actorName, ActionRestart, m)) policy; - let m4 = fold Resume m3 ((actorName, m) -> Map.insert(actorName, ActionResume, m)) policy; - // Then, we just apply the action for each actor, according to the table. - Map.foldLeftWithKey((m, actorName, action) -> - match action { - case ActionStart => match Map.get(actorName, m.map) { - case Some((NotStarted, behavior, _)) => - // Start the actor with a new mailbox - let mailbox = chan Message emptyMailboxCapacity(); - startActor(m, control, actorName, behavior, mailbox) - case Some((Started(Stopped, mailbox), behavior, _)) => - // Start the actor, reusing the mailbox - startActor(m, control, actorName, behavior, mailbox) - case Some((Started(ResumablyCrashed, mailbox), behavior, _)) => - // Start the actor with the initial state, but reusing the mailbox - startActor(m, control, actorName, behavior, mailbox) - case Some((Started(NonResumablyCrashed, mailbox), behavior, _)) => - // Start the actor with the initial state, reusing the mailbox - startActor(m, control, actorName, behavior, mailbox) - case _ => - // No-op - m - } - case ActionRestart => match Map.get(actorName, m.map) { - case Some((Started(Running, mailbox), _, _)) => - // If the actor is running, stop it first - mailbox <- Stop; - // Remember that the actor has to be restarted after it successfully stopped - { restarting = actorName :: m.restarting | m } - case Some((Started(Stopped, mailbox), behavior, _)) => - startActor(m, control, actorName, behavior, mailbox) - case Some((Started(ResumablyCrashed, mailbox), behavior, _)) => - // Start the actor from a fresh initial state, ignores the resumable behavior - startActor(m, control, actorName, behavior, mailbox) - case Some((Started(NonResumablyCrashed, mailbox), behavior, _)) => - startActor(m, control, actorName, behavior, mailbox) - case _ => - // No-op - // TODO: is that correct? This is only for the case of an actor that hasn't been started, hence "restarting" it does not really make sense - m - } - case ActionResume => match Map.get(actorName, m.map) { - case Some((Started(Stopped, mailbox), behavior, _)) => - // Start the actor with the initial state, reusing the mailbox - startActor(m, control, actorName, behavior, mailbox) - case Some((Started(ResumablyCrashed, mailbox), behavior, resumableBehavior)) => - startActor(m, control, actorName, Option.getWithDefault(resumableBehavior, behavior), mailbox) - case Some((Started(NonResumablyCrashed, mailbox), behavior, _)) => - // Start the actor with initial state - startActor(m, control, actorName, behavior, mailbox) - case _ => - // No-op - m - } - case ActionStop => match Map.get(actorName, m.map) { - case Some((Started(Running, mailbox), _, _)) => - // Stop the actor - mailbox <- Stop; // actor will terminate and will notify the actor system - m - case Some((Started(ResumablyCrashed, mailbox), behavior, resumableBehavior)) => - // Change the state of the crashed actor to stopped - { map = Map.insert(actorName, (Started(Stopped, mailbox), behavior, resumableBehavior), m.map), - log = (actorName, ResumablyCrashed, Stopped, Instant.now()) :: m.log - | m } - case Some((Started(NonResumablyCrashed, mailbox), behavior, resumableBehavior)) => - // Change the state of the crashed actor to stopped - { map = Map.insert(actorName, (Started(Stopped, mailbox), behavior, resumableBehavior), m.map), - log = (actorName, NonResumablyCrashed, Stopped, Instant.now()) :: m.log - | m } - case _ => - // No-op - m - } - }, actors, m4) - } - - enum Policy { - case PolicyNotStarted, // Not started yet - case PolicyRunning, // Fully running - case PolicyStopping(Channel[Unit]) // Currently stopping, with the channel to notify when everything has safely stopped - } - - /// - /// Restart all actors that are "restarting" (have been send a Stop signal) and have successfully stopped or crashed. - /// - def startRestartingActors(actors: ActorMap, control: Channel[SystemEvent]): ActorMap & Impure = { - let (actors2, restarting) = List.foldLeft((acc, actorName) -> - let (m, notRestarted) = acc; - match Map.get(actorName, m.map) { - case Some((Started(Stopped, mailbox), behavior, _)) => - // The actor successfully stopped, start it again - (startActor(m, control, actorName, behavior, mailbox), notRestarted) - case Some((Started(ResumablyCrashed, mailbox), behavior, _)) => - // The actor crashed (probably when stopping), start it again - (startActor(m, control, actorName, behavior, mailbox), notRestarted) - case Some((Started(NonResumablyCrashed, mailbox), behavior, _)) => - // The actor crashed (probably when stopping), start it again - (startActor(m, control, actorName, behavior, mailbox), notRestarted) - case _ => - // The actor is stil running, don't start it now. - (m, actorName :: notRestarted) - }, (actors, Nil), actors.restarting); - // Only keep actors that have not been restarted as "restarting" actors - { restarting = restarting | actors2 } - } - - /// - /// Run the current policy (@param `policy`). Returns the updated actor map, as well as the next state of the policy. - /// - def runPolicy(s: ActorSystem, actors: ActorMap, control: Channel[SystemEvent], policy: Policy): Option[(ActorMap, Policy)] & Impure = { - let r = match policy { - case PolicyNotStarted => Some((actors, policy)) - case PolicyRunning => Some((runRestartPolicy(s, actors, control), PolicyRunning)) - case PolicyStopping(c) => Option.map((a) -> (a, PolicyStopping(c)), runShutdownPolicy(s, actors, control, c)) - }; - // Restart the actors that have to be restarted - Option.map(res -> (startRestartingActors(fst(res), control), snd(res)), r) - } - - def runPolicyAndRecurse(s: ActorSystem, actors: ActorMap, control: Channel[SystemEvent], policy: Policy): Unit & Impure = - match runPolicy(s, actors, control, policy) { - case Some((actors2, policy2)) => actorSystem(s, actors2, control, policy2) - case None => () - } - - /// - /// The main loop of the actor system, handling messages over the control channel - /// - def actorSystem(s: ActorSystem, actors: ActorMap, control: Channel[SystemEvent], policy: Policy): Unit & Impure = select { - case event <- control => match event { - case SystemStart => - //assert!(policy == PolicyNotStarted); // Otherwise, the actor system has been started more than once. - runPolicyAndRecurse(s, actors, control, PolicyRunning) - case SystemShutdown(c) => - runPolicyAndRecurse(s, actors, control, PolicyStopping(c)) - case SendMessage(actorName, msg, c) => - match Map.get(actorName, actors.map) { - case Some((Started(Running, mailbox), _, _)) => - mailbox <- msg; - c <- true - case Some(_) => c <- false // Actor not running - case _ => c <- false // Actor not found - }; - // We don't run the policy here because that's an external event and had no way of making the actor system progress. - actorSystem(s, actors, control, policy) - case GetActor(actorName, c) => - let mb = Option.flatMap(act -> match act { - case (Started(st, mailbox), _, _) => - Some(mailbox) - case _ => None - }, Map.get(actorName, actors.map)); - c <- mb; - actorSystem(s, actors, control, policy) - case CreateActor(parent, beh, category, replyChan) => - import java.time.Instant.toString(); - import java.time.Instant:now(); - let mailbox = chan Message emptyMailboxCapacity(); - let actorName = category + (toString(now())); - let actors2 = startActor(actors, control, actorName, beh, mailbox); - let actors3 = { dynamicActors = (actorName, parent, category) :: actors2.dynamicActors | actors2 }; - replyChan <- mailbox; - runPolicyAndRecurse(s, actors3, control, policy) - case ActorReady(actorName) => - let actors2 = match Map.get(actorName, actors.map) { - case Some((Started(InTransition, mailbox), behavior, resumableBehavior)) => - // The actor has initialized and is now running - { map = Map.insert(actorName, (Started(Running, mailbox), behavior, resumableBehavior), actors.map), - log = (actorName, InTransition, Running, Instant.now()) :: actors.log - | actors } - case _ => - // Ignore ready message if the actor is not in transition, there probably has been a race that we can safely ignore - actors - }; - runPolicyAndRecurse(s, actors2, control, policy) - case ActorDone(actorName) => - let actors2 = match Map.get(actorName, actors.map) { - case Some((Started(Running, mailbox), behavior, resumableBehavior)) => - { map = Map.insert(actorName, (Started(Stopped, mailbox), behavior, resumableBehavior), actors.map), - log = (actorName, Running, Stopped, Instant.now()) :: actors.log - | actors } - case _ => - // TODO: When do these other case happen? If the actor is done, it should not be in any other state than started. - ??? - }; - runPolicyAndRecurse(s, actors2, control, policy) - case ActorResumableCrash(actorName, resumableBehavior) => - let actors2 = match Map.get(actorName, actors.map) { - case Some((Started(st, mailbox), behavior, _)) => - { map = Map.insert(actorName, (Started(ResumablyCrashed, mailbox), behavior, Some(resumableBehavior)), actors.map), - log = (actorName, st, ResumablyCrashed, Instant.now()) :: actors.log - | actors } - case _ => - // TODO: actor should only crash if it was running before - // TODO: what about InTransition? An actor could crash while initializing - ??? - }; - runPolicyAndRecurse(s, actors2, control, policy) - case ActorNonResumableCrash(actorName) => - let actors2 = match Map.get(actorName, actors.map) { - case Some((Started(st, mailbox), behavior, resumableBehavior)) => - { map = Map.insert(actorName, (Started(NonResumablyCrashed, mailbox), behavior, resumableBehavior), actors.map), - log = (actorName, st, NonResumablyCrashed, Instant.now()) :: actors.log - | actors } - case _ => - // TODO: same as for resumable crash - ??? - }; - runPolicyAndRecurse(s, actors2, control, policy) - } - case _ <- Timer.seconds(1i64) => - // Run the policy every second if nothing else happened - runPolicyAndRecurse(s, actors, control, policy) - } - - /// - /// Starts the given actor system `s`. Returns the updated actor system - /// - pub def start(s: ActorSystem): RunningActorSystem & Impure = { - let control = chan SystemEvent 10; - let actors = Map.foldLeftWithKey((m, actorName, behavior) -> - Map.insert(actorName, (NotStarted, behavior, None), m), - Map#{}, s.actors); - spawn (actorSystem(s, { map = actors, log = Nil, restarting = Nil, dynamicActors = Nil }, control, PolicyNotStarted)); - control <- SystemStart; - RunningActorSystem(control) - } - - /// - /// Composes two actor systems into a single one, if they can be joined. - /// Names of actors in both systems have to be distinct. - /// Policies are joined, which might not be what you want. In that case, it is a good idea to redefine the policies that have to be, after composing the systems. - /// - pub def compose(s1: ActorSystem, s2: ActorSystem): Option[ActorSystem] = - if (Map.isEmpty(Map.intersection(s1.actors, s2.actors))) { - // No shared names, we can join - Some ({ - actors = Map.unionWith((v1, _) -> v1 /* should never be called */, s1.actors, s2.actors), - startPolicy = s1.startPolicy <+> s2.startPolicy, - restartPolicy = s1.restartPolicy <+> s2.restartPolicy, - shutdownPolicy = s1.shutdownPolicy <+> s2.shutdownPolicy - }) - } else { - // There are some names shared so we can't join the systems - None - } - - /// - /// Initiates a shutdown of the given actor system `s` and waits for its orderly shutdown. - /// - pub def shutdown(s: RunningActorSystem): Unit & Impure = { - let RunningActorSystem(sys) = s; - let c = chan Unit 0; - sys <- SystemShutdown(c); - <- c - } -} - diff --git a/src/main/actor/Mailbox.flix b/src/main/actor/Mailbox.flix deleted file mode 100644 index 18681ff..0000000 --- a/src/main/actor/Mailbox.flix +++ /dev/null @@ -1,4 +0,0 @@ -/// -/// A mailbox is a type alias for a channel of messages. -/// -type alias Mailbox = Channel[Message] diff --git a/src/main/actor/Ready.flix b/src/main/actor/Ready.flix deleted file mode 100644 index c1ef833..0000000 --- a/src/main/actor/Ready.flix +++ /dev/null @@ -1,6 +0,0 @@ -/// -/// A special message sent by an actor to indicate that it is ready to receive messages. -/// -pub enum Ready { - case Ready // TODO: Extend with another message: Unavailable which means that the actor cannot start? If so, rename the enum -} diff --git a/test/TestMain.flix b/test/TestMain.flix new file mode 100644 index 0000000..729fe48 --- /dev/null +++ b/test/TestMain.flix @@ -0,0 +1,2 @@ +@Test +def test01(): Bool = 1 + 1 == 2 From 35a3db01890d9c562eb72f900cbf3f81847dbd0d Mon Sep 17 00:00:00 2001 From: Magnus Madsen Date: Wed, 3 Dec 2025 13:44:23 +0100 Subject: [PATCH 2/2] chore: update gitignore --- .claude/settings.local.json | 15 +++++++++++++++ .gitignore | 3 +++ 2 files changed, 18 insertions(+) create mode 100644 .claude/settings.local.json diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..c58e98f --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,15 @@ +{ + "permissions": { + "allow": [ + "Bash(java -jar flix.jar build:*)", + "Bash(1)", + "Bash(grep:*)", + "WebSearch", + "WebFetch(domain:api.flix.dev)", + "Bash(java -jar flix.jar run:*)", + "WebFetch(domain:doc.flix.dev)" + ], + "deny": [], + "ask": [] + } +} diff --git a/.gitignore b/.gitignore index 45884c4..7bea36e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ /.idea *.iml +*.jar +NUL +old \ No newline at end of file