From d0445d02afe8bcac2716dbcf346887c529632235 Mon Sep 17 00:00:00 2001 From: Michael Crumm Date: Thu, 13 Oct 2022 15:09:57 -0700 Subject: [PATCH 1/2] Add clarity to Fable.create_handler/4 --- lib/fable.ex | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/fable.ex b/lib/fable.ex index e4c30fa..600a942 100644 --- a/lib/fable.ex +++ b/lib/fable.ex @@ -12,8 +12,17 @@ defmodule Fable do `stream_cursors` database table. """ + @doc """ + Creates a new handler for a [`ProcessManager`](`Fable.ProcessManager`). + + ## Examples + + Fable.create_handler(MyApp.Events, :pm, MyApp.ProcessManager, %{}) + # => {:ok, %Fable.ProcessManager.State{}} + """ + def create_handler(events, name, module, initial_state) do - config = events.__fable_config__ + config = events.__fable_config__() last_event_id = case module.start_at(initial_state) do From ca33bcd0797126d8d65dc2419b7918385a6234da Mon Sep 17 00:00:00 2001 From: Michael Crumm Date: Thu, 13 Oct 2022 17:05:38 -0700 Subject: [PATCH 2/2] WIP: sync process manager state --- lib/fable.ex | 18 +++++- lib/fable/config.ex | 11 ++++ lib/fable/events.ex | 44 ++++++++++---- lib/fable/process_manager.ex | 84 +++------------------------ lib/fable/process_manager/workflow.ex | 83 ++++++++++++++++++++++++++ 5 files changed, 151 insertions(+), 89 deletions(-) create mode 100644 lib/fable/process_manager/workflow.ex diff --git a/lib/fable.ex b/lib/fable.ex index 600a942..cd795c8 100644 --- a/lib/fable.ex +++ b/lib/fable.ex @@ -55,16 +55,28 @@ defmodule Fable do @doc false def init(%{registry: registry} = config) do + children = + List.flatten([ + {Registry, keys: :unique, name: registry}, + process_manager_children(config.process_manager_mode, config) + ]) + + Supervisor.init(children, strategy: :rest_for_one) + end + + defp process_manager_children(:notify, config) do + %{registry: registry} = config notifications_name = via(registry, Notifications) - children = [ - {Registry, keys: :unique, name: registry}, + [ notifications_child(config.repo, notifications_name), {DynamicSupervisor, strategy: :one_for_one, name: via(registry, ProcessManagerSupervisor)}, {__MODULE__.ProcessManager.Locks, config} ] + end - Supervisor.init(children, strategy: :rest_for_one) + defp process_manager_children(:sync, _config) do + [] end defp notifications_child(repo, name) do diff --git a/lib/fable/config.ex b/lib/fable/config.ex index 597155a..f1b7864 100644 --- a/lib/fable/config.ex +++ b/lib/fable/config.ex @@ -2,11 +2,14 @@ defmodule Fable.Config do # Documented in `Fable.Events` @moduledoc false + @type process_manager_mode :: :sync | :notify + @type t :: %Fable.Config{ repo: Ecto.Repo.t(), registry: module(), router: Fable.Router.t(), event_schema: module(), + process_manager_mode: process_manager_mode(), process_manager_schema: module(), json_library: module } @@ -16,6 +19,7 @@ defmodule Fable.Config do :registry, :router, event_schema: Fable.Event, + process_manager_mode: :notify, process_manager_schema: Fable.ProcessManager.State, json_library: Jason ] @@ -25,9 +29,16 @@ defmodule Fable.Config do attrs = attrs |> Map.new() + |> Map.put_new(:process_manager_mode, :notify) |> Map.put_new(:router, module) |> Map.put_new(:registry, Module.concat(Fable, attrs[:repo])) + unless attrs.process_manager_mode in [:sync, :notify] do + raise ArgumentError, + "invalid :process_manager_mode, expected one of :sync, :notify" <> + ", got: #{inspect(attrs.process_manager_mode)}" + end + struct!(__MODULE__, attrs) end end diff --git a/lib/fable/events.ex b/lib/fable/events.ex index 6d24eef..dab26ec 100644 --- a/lib/fable/events.ex +++ b/lib/fable/events.ex @@ -249,11 +249,13 @@ defmodule Fable.Events do config = Map.put(config, :repo, repo) aggregate = lock(aggregate, repo) || aggregate - events = case fun do - fun when is_function(fun, 3) -> - fun.(aggregate, repo, changes) - fun when is_function(fun, 2) -> - fun.(aggregate, repo) + events = + case fun do + fun when is_function(fun, 3) -> + fun.(aggregate, repo, changes) + + fun when is_function(fun, 2) -> + fun.(aggregate, repo) end result_of_applied_events = handle_events(config, aggregate, events, opts) @@ -316,14 +318,20 @@ defmodule Fable.Events do event = Fable.Event.parse_data(repo, event) - case Map.fetch!(config.router.handlers(), Module.safe_concat([event.type])) do - functions when is_list(functions) -> - reduce_while_ok(aggregate, functions, fn aggregate, function -> + result = + case Map.fetch!(config.router.handlers(), Module.safe_concat([event.type])) do + functions when is_list(functions) -> + reduce_while_ok(aggregate, functions, fn aggregate, function -> + function.(aggregate, event.data) + end) + + function when is_function(function) -> function.(aggregate, event.data) - end) + end - function when is_function(function) -> - function.(aggregate, event.data) + with {:ok, _} <- result do + :ok = sync_process_managers(config, repo, event) + result end end @@ -348,6 +356,20 @@ defmodule Fable.Events do |> Ecto.Changeset.cast(attrs, Map.keys(attrs)) end + @spec sync_process_managers(Fable.Config.t(), Ecto.Repo.t(), Fable.Event.t()) :: :ok + defp sync_process_managers(%{process_manager_mode: :sync} = config, repo, event) do + config.process_manager_schema + |> Fable.Event.active() + |> repo.all() + |> Enum.each(fn pm -> + Fable.ProcessManager.Workflow.execute!(pm, event, repo) + end) + + :ok + end + + defp sync_process_managers(_config, _repo, _aggregate), do: :ok + @spec rollback_on_error(Ecto.Repo.t(), {:ok | :error, term}) :: term | no_return defp rollback_on_error(repo, {:error, error}) do repo.rollback(error) diff --git a/lib/fable/process_manager.ex b/lib/fable/process_manager.ex index aa90249..3c43361 100644 --- a/lib/fable/process_manager.ex +++ b/lib/fable/process_manager.ex @@ -148,84 +148,18 @@ defmodule Fable.ProcessManager do end defp handle_event(event, state) do - case run_handler(state, event) do - {:ok, data} -> - state.handler - |> Fable.ProcessManager.State.progress_to(event.id, data) - |> state.repo.update() - |> case do - {:ok, handler} -> - {:cont, %__MODULE__{state | handler: handler}} - - {:error, error} -> - Logger.error(""" - Handler #{state.handler.name} handler error: - #{inspect(error)} - Stopping! - """) - - disable(state.repo, state.handler.name) - {:halt, %{state | handler: nil}} - end - - error -> - Logger.error(""" - Handler #{state.handler.name} error: - #{inspect(error)} - """) - - handler = - case apply(state.handler.module, :handle_error, [event, error, state.handler.state]) do - {:retry, interval, handler_state} -> - Logger.info("Handler #{state.handler.name} retrying in #{interval}...") - Process.send_after(self(), :retry, interval) - - state.handler - |> Fable.ProcessManager.State.update_state(handler_state) - |> state.repo.update!() - - :stop -> - Logger.error(""" - Handler #{state.handler.name} stopped! - Manual intervention required! - """) - - disable(state.repo, state.handler.name) - - nil - - other -> - Logger.error(""" - Handler #{state.handler.name} failed to handle error! - Returned: #{inspect(other)} - Manual intervention required! - """) - - disable(state.repo, state.handler.name) - - nil - end + case Fable.ProcessManager.Workflow.execute(state.handler, event, state.repo) do + {:ok, handler} -> + {:cont, %__MODULE__{state | handler: handler}} + {:retry, interval, handler} -> + Logger.info("Handler #{state.handler.name} retrying in #{interval}...") + Process.send_after(self(), :retry, interval) {:halt, %{state | handler: handler}} - end - end - defp run_handler(state, event) do - Logger.debug(""" - Handler #{state.handler.name} handling: #{inspect(event)} - """) - - event = Fable.Event.parse_data(state.repo, event) - apply(state.handler.module, :handle_event, [event.data, state.handler.state]) - rescue - e -> - Logger.error(""" - Handler #{state.handler.name} raised exception! - #{inspect(e)} - #{Exception.format_stacktrace(__STACKTRACE__)} - """) - - {:error, e} + :stop -> + {:halt, %{state | handler: nil}} + end end defp get_events(state) do diff --git a/lib/fable/process_manager/workflow.ex b/lib/fable/process_manager/workflow.ex new file mode 100644 index 0000000..43b9566 --- /dev/null +++ b/lib/fable/process_manager/workflow.ex @@ -0,0 +1,83 @@ +defmodule Fable.ProcessManager.Workflow do + # Executes the process manager workflow. + @moduledoc false + alias Fable.ProcessManager + require Logger + + def execute!(pm, event, repo) do + case execute(pm, event, repo) do + {:ok, pm} -> + pm + + {:retry, interval, pm} -> + :ok = Process.sleep(interval) + execute!(pm, event, repo) + + :stop -> + raise RuntimeError, "failed to execute! (todo)" + end + end + + def execute(pm, event, repo) do + %{module: mod, state: state} = pm + + case mod.handle_event(event.data, state) do + {:ok, data} -> + pm + |> ProcessManager.State.progress_to(event.id, data) + |> repo.update() + + error -> + Logger.error(""" + Handler #{pm.name} error: + #{inspect(error)} + """) + + handle_error(pm, error, event, repo) + end + rescue + e -> + Logger.error(""" + Handler #{pm.name} raised exception! + #{inspect(e)} + #{Exception.format_stacktrace(__STACKTRACE__)} + """) + + :stop + end + + defp handle_error(pm, error, event, repo) do + %{module: mod, state: state} = pm + + case mod.handle_error(event, error, state) do + {:retry, interval, new_state} -> + new_pm = + pm + |> ProcessManager.State.update_state(new_state) + |> repo.update!() + + {:retry, interval, new_pm} + + :stop -> + Logger.error(""" + Handler #{pm.name} stopped! + Manual intervention required! + """) + + ProcessManager.disable(repo, pm.name) + + :stop + + other -> + Logger.error(""" + Handler #{pm.name} failed to handle error! + Returned: #{inspect(other)} + Manual intervention required! + """) + + ProcessManager.disable(repo, pm.name) + + :stop + end + end +end