From 71055cd7f756fab436c7211bdfbd7d8bceac64cd Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 21 Jan 2026 15:46:42 +0300 Subject: [PATCH] expand process_status enumerate --- include/progressor.hrl | 6 +-- priv/schemas/postgres-schema.sql | 2 +- rebar.config | 2 +- rebar.lock | 2 +- src/prg_storage.erl | 6 --- src/prg_worker.erl | 50 +++++++++++++++-------- src/progressor.erl | 4 +- src/storage/postgres/prg_pg_backend.erl | 41 ------------------- src/storage/postgres/prg_pg_migration.erl | 18 ++++++++ test/prg_base_SUITE.erl | 4 +- 10 files changed, 62 insertions(+), 73 deletions(-) diff --git a/include/progressor.hrl b/include/progressor.hrl index 45f4b9e..5edb6b9 100644 --- a/include/progressor.hrl +++ b/include/progressor.hrl @@ -127,7 +127,7 @@ -type task_id() :: pos_integer(). -type process_status() :: binary(). -% <<"running">> | <<"error">> +% <<"init">> | <<"running">> | <<"error">> -type task_status() :: binary(). % <<"waiting">> | <<"running">> | <<"blocked">> | <<"error">> | <<"finished">> | <<"cancelled">> @@ -219,8 +219,8 @@ -define(NEW_PROCESS(ID), #{ process_id => ProcessId, - status => <<"running">>, - previous_status => <<"running">>, + status => <<"init">>, + previous_status => <<"init">>, created_at => erlang:system_time(second), status_changed_at => erlang:system_time(second) }). diff --git a/priv/schemas/postgres-schema.sql b/priv/schemas/postgres-schema.sql index ca6857f..7c191eb 100644 --- a/priv/schemas/postgres-schema.sql +++ b/priv/schemas/postgres-schema.sql @@ -1,4 +1,4 @@ -CREATE TYPE process_status AS ENUM ('running', 'error'); +CREATE TYPE process_status AS ENUM ('init', 'running', 'error'); CREATE TYPE task_status AS ENUM ('waiting', 'running', 'blocked', 'error', 'finished', 'cancelled'); CREATE TYPE task_type AS ENUM ('init', 'timeout', 'call', 'notify', 'repair', 'remove'); diff --git a/rebar.config b/rebar.config index e6be635..1261a06 100644 --- a/rebar.config +++ b/rebar.config @@ -5,7 +5,7 @@ {recon, "2.5.6"}, {thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}}, {mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}}, - {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.3"}}} + {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}} ]}. {xref_checks, [ diff --git a/rebar.lock b/rebar.lock index 004f7ba..28c39ff 100644 --- a/rebar.lock +++ b/rebar.lock @@ -7,7 +7,7 @@ {<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.11">>},2}, {<<"epg_connector">>, {git,"https://github.com/valitydev/epg_connector.git", - {ref,"af35200fa1c63e7afeaa90cad862944c194b2686"}}, + {ref,"939a0d4ab3f7561a79b45381bbe13029d9263006"}}, 0}, {<<"epgsql">>, {git,"https://github.com/epgsql/epgsql.git", diff --git a/src/prg_storage.erl b/src/prg_storage.erl index 5d5ca45..3712344 100644 --- a/src/prg_storage.erl +++ b/src/prg_storage.erl @@ -11,7 +11,6 @@ -export([prepare_repair/4]). -export([put_process_data/4]). -export([process_trace/3]). --export([get_process_with_initialization/4]). -export([repair_process/3]). %% scan functions @@ -72,11 +71,6 @@ put_process_data(#{client := Handler, options := HandlerOpts}, NsId, Id, Process process_trace(#{client := Handler, options := HandlerOpts}, NsId, Id) -> Handler:process_trace(HandlerOpts, NsId, Id). --spec get_process_with_initialization(storage_opts(), namespace_id(), id(), history_range()) -> - {ok, process()} | {error, _Reason}. -get_process_with_initialization(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, HistoryRange) -> - Handler:get_process_with_initialization(HandlerOpts, NsId, ProcessId, HistoryRange). - -spec prepare_init(storage_opts(), namespace_id(), id(), task()) -> {ok, {postpone, task_id()} | {continue, task_id()}} | {error, _Reason}. prepare_init(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, InitTask) -> diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 7484a76..d3bef1a 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -545,28 +545,26 @@ error_and_retry({error, Reason} = Response, TaskHeader, Task, Deadline, State) - update_process(#{status := <<"error">>, process_id := ProcessId} = Process, {error, _}) -> %% process error when already broken {Process, #{process_id => ProcessId}}; -update_process(#{status := <<"running">>, process_id := ProcessId} = Process, {error, {Detail, Cause}}) -> - %% process broken (transition from running to error) +update_process(#{status := Status, process_id := ProcessId} = Process, {error, {Detail, Cause}}) when + Status =:= <<"running">>; + Status =:= <<"init">> +-> + %% process broken (transition from running/init to error) StatusChangedAt = erlang:system_time(second), - NewProcess = - case Cause of - undefined -> - Process#{status => <<"error">>, detail => Detail}; - TaskId -> - Process#{status => <<"error">>, detail => Detail, corrupted_by => TaskId} - end, ProcessUpdates = #{ process_id => ProcessId, status => <<"error">>, - previous_status => <<"running">>, + previous_status => Status, status_changed_at => StatusChangedAt, - detail => Detail, - corrupted_by => Cause + detail => Detail }, - { - NewProcess, - ProcessUpdates - }; + case Cause of + undefined -> + {maps:merge(Process, ProcessUpdates), ProcessUpdates}; + TaskId -> + Updates = ProcessUpdates#{corrupted_by => TaskId}, + {maps:merge(Process, Updates), Updates} + end; update_process(#{status := <<"error">>, process_id := ProcessId} = Process, Intent) -> %% process repaired (transition from error to running) StatusChangedAt = erlang:system_time(second), @@ -583,6 +581,26 @@ update_process(#{status := <<"error">>, process_id := ProcessId} = Process, Inte corrupted_by => undefined }, update_process_from_intent(NewProcess, ProcessUpdates, Intent); +update_process(#{status := <<"init">>, process_id := ProcessId} = Process, Intent) -> + %% transition from init to running + StatusChangedAt = erlang:system_time(second), + ProcessUpdates = #{ + process_id => ProcessId, + status => <<"running">>, + previous_status => <<"init">>, + status_changed_at => StatusChangedAt + }, + NewProcess = maps:merge(Process, ProcessUpdates), + update_process_from_intent(NewProcess, ProcessUpdates, Intent); +update_process(#{previous_status := <<"init">>, status := <<"running">>, process_id := ProcessId} = Process, Intent) -> + %% first transition from running to running, need to update previous_status + ProcessUpdates = #{ + process_id => ProcessId, + status => <<"running">>, + previous_status => <<"running">> + }, + NewProcess = maps:merge(Process, ProcessUpdates), + update_process_from_intent(NewProcess, ProcessUpdates, Intent); update_process(#{status := <<"running">>, process_id := ProcessId} = Process, Intent) -> %% normal work update_process_from_intent(Process, #{process_id => ProcessId}, Intent). diff --git a/src/progressor.erl b/src/progressor.erl index 8473f9b..1428582 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -311,8 +311,8 @@ await_task_result(StorageOpts, NsId, KeyOrId, StepTimeout, Duration) -> end. do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange} = Req) -> - case prg_storage:get_process_with_initialization(StorageOpts, NsId, Id, HistoryRange) of - {ok, #{initialization := _TaskId}} -> + case prg_storage:get_process(external, StorageOpts, NsId, Id, HistoryRange) of + {ok, #{status := <<"init">>}} -> %% init task not finished, await and retry Timeout = application:get_env(progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT), timer:sleep(Timeout), diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index 95755ec..a70f216 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -14,7 +14,6 @@ -export([prepare_repair/4]). -export([put_process_data/4]). -export([process_trace/3]). --export([get_process_with_initialization/4]). -export([repair_process/3]). %% scan functions @@ -139,32 +138,6 @@ get_process(Recipient, PgOpts, NsId, ProcessId, HistoryRange) -> ), parse_process_info(RawResult, HistoryRange). --spec get_process_with_initialization(pg_opts(), namespace_id(), id(), history_range()) -> - {ok, process()} | {error, _Reason}. -get_process_with_initialization(PgOpts, NsId, ProcessId, HistoryRange) -> - Pool = get_pool(external, PgOpts), - #{ - processes := ProcessesTable, - running := RunningTable, - events := EventsTable - } = prg_pg_utils:tables(NsId), - RangeCondition = create_range_condition(HistoryRange), - RawResult = epg_pool:transaction( - Pool, - fun(Connection) -> - case do_get_process_with_initialization(Connection, ProcessesTable, RunningTable, ProcessId) of - {ok, _, []} -> - {error, <<"process not found">>}; - {ok, ColumnsPr, RowsPr} -> - {ok, _, _} = - {ok, ColumnstEv, RowsEv} = do_get_events(Connection, EventsTable, ProcessId, RangeCondition), - LastEventId = get_last_event_id(Connection, EventsTable, ProcessId), - {ok, {ColumnsPr, RowsPr}, {ColumnstEv, RowsEv}, LastEventId} - end - end - ), - parse_process_info(RawResult, HistoryRange). - -spec repair_process(pg_opts(), namespace_id(), id()) -> ok | no_return(). repair_process(PgOpts, NsId, ProcessId) -> Pool = get_pool(external, PgOpts), @@ -752,20 +725,6 @@ do_get_process(Connection, Table, ProcessId) -> [ProcessId] ). -do_get_process_with_initialization(Connection, ProcessesTable, RunningTable, ProcessId) -> - SQL = - "SELECT" - " pr.*, rt.task_id as initialization FROM " ++ ProcessesTable ++ - " pr " - " LEFT JOIN " ++ RunningTable ++ - " rt ON pr.process_id = rt.process_id AND rt.task_type = 'init'" - " WHERE pr.process_id = $1", - epg_pool:query( - Connection, - SQL, - [ProcessId] - ). - parse_process_info(RawResult, HistoryRange) -> case RawResult of {error, _} = Error -> diff --git a/src/storage/postgres/prg_pg_migration.erl b/src/storage/postgres/prg_pg_migration.erl index 0b96d4b..5dee44d 100644 --- a/src/storage/postgres/prg_pg_migration.erl +++ b/src/storage/postgres/prg_pg_migration.erl @@ -241,6 +241,24 @@ db_init(#{pool := Pool}, NsId) -> ) end, + %% MIGRATION 3 + %% Expand prosess_status enumeration + {ok, _, [{IsInitStatusExists}]} = epg_pool:query( + Connection, + "select exists (SELECT 1 FROM pg_enum WHERE " + " enumtypid = 'process_status'::regtype and enumlabel = 'init')" + ), + _ = + case IsInitStatusExists of + true -> + ok; + false -> + {ok, _, _} = epg_pool:query( + Connection, + "ALTER TYPE process_status ADD VALUE 'init'" + ) + end, + %%% END {ok, [], []} end diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 15fd707..6a2fb46 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -36,7 +36,7 @@ -export([task_race_condition_hack_test/1]). -define(NS(C), proplists:get_value(ns_id, C, 'default/default')). --define(AWAIT_TIMEOUT(C), proplists:get_value(repl_timeout, C, 0)). +-define(AWAIT_TIMEOUT(C), proplists:get_value(repl_timeout, C, 5)). init_per_suite(Config) -> Config. @@ -1410,12 +1410,12 @@ mock_processor(put_process_with_timeout_test = TestCase) -> mock_processor(task_race_condition_hack_test = TestCase) -> Self = self(), MockProcessor = fun({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> + Self ! 1, timer:sleep(3000), Result = #{ events => [event(1)], aux_state => <<"aux_state">> }, - Self ! 1, {ok, Result} end, mock_processor(TestCase, MockProcessor).