Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/progressor.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
-type task_id() :: pos_integer().

-type process_status() :: binary().
% <<"running">> | <<"error">>
% <<"init">> | <<"running">> | <<"error">>

-type task_status() :: binary().
% <<"waiting">> | <<"running">> | <<"blocked">> | <<"error">> | <<"finished">> | <<"cancelled">>
Expand Down Expand Up @@ -219,8 +219,8 @@

-define(NEW_PROCESS(ID), #{
process_id => ProcessId,
status => <<"running">>,
previous_status => <<"running">>,
status => <<"init">>,
previous_status => <<"init">>,
created_at => erlang:system_time(second),
status_changed_at => erlang:system_time(second)
}).
2 changes: 1 addition & 1 deletion priv/schemas/postgres-schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TYPE process_status AS ENUM ('running', 'error');
CREATE TYPE process_status AS ENUM ('init', 'running', 'error');
CREATE TYPE task_status AS ENUM ('waiting', 'running', 'blocked', 'error', 'finished', 'cancelled');
CREATE TYPE task_type AS ENUM ('init', 'timeout', 'call', 'notify', 'repair', 'remove');

Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{recon, "2.5.6"},
{thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}},
{mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}},
{epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.3"}}}
{epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}}
]}.

{xref_checks, [
Expand Down
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
{<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.11">>},2},
{<<"epg_connector">>,
{git,"https://github.com/valitydev/epg_connector.git",
{ref,"af35200fa1c63e7afeaa90cad862944c194b2686"}},
{ref,"939a0d4ab3f7561a79b45381bbe13029d9263006"}},
0},
{<<"epgsql">>,
{git,"https://github.com/epgsql/epgsql.git",
Expand Down
6 changes: 0 additions & 6 deletions src/prg_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
-export([prepare_repair/4]).
-export([put_process_data/4]).
-export([process_trace/3]).
-export([get_process_with_initialization/4]).
-export([repair_process/3]).

%% scan functions
Expand Down Expand Up @@ -72,11 +71,6 @@ put_process_data(#{client := Handler, options := HandlerOpts}, NsId, Id, Process
process_trace(#{client := Handler, options := HandlerOpts}, NsId, Id) ->
Handler:process_trace(HandlerOpts, NsId, Id).

-spec get_process_with_initialization(storage_opts(), namespace_id(), id(), history_range()) ->
{ok, process()} | {error, _Reason}.
get_process_with_initialization(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, HistoryRange) ->
Handler:get_process_with_initialization(HandlerOpts, NsId, ProcessId, HistoryRange).

-spec prepare_init(storage_opts(), namespace_id(), id(), task()) ->
{ok, {postpone, task_id()} | {continue, task_id()}} | {error, _Reason}.
prepare_init(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, InitTask) ->
Expand Down
50 changes: 34 additions & 16 deletions src/prg_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -545,28 +545,26 @@ error_and_retry({error, Reason} = Response, TaskHeader, Task, Deadline, State) -
update_process(#{status := <<"error">>, process_id := ProcessId} = Process, {error, _}) ->
%% process error when already broken
{Process, #{process_id => ProcessId}};
update_process(#{status := <<"running">>, process_id := ProcessId} = Process, {error, {Detail, Cause}}) ->
%% process broken (transition from running to error)
update_process(#{status := Status, process_id := ProcessId} = Process, {error, {Detail, Cause}}) when
Status =:= <<"running">>;
Status =:= <<"init">>
->
%% process broken (transition from running/init to error)
StatusChangedAt = erlang:system_time(second),
NewProcess =
case Cause of
undefined ->
Process#{status => <<"error">>, detail => Detail};
TaskId ->
Process#{status => <<"error">>, detail => Detail, corrupted_by => TaskId}
end,
ProcessUpdates = #{
process_id => ProcessId,
status => <<"error">>,
previous_status => <<"running">>,
previous_status => Status,
status_changed_at => StatusChangedAt,
detail => Detail,
corrupted_by => Cause
detail => Detail
},
{
NewProcess,
ProcessUpdates
};
case Cause of
undefined ->
{maps:merge(Process, ProcessUpdates), ProcessUpdates};
TaskId ->
Updates = ProcessUpdates#{corrupted_by => TaskId},
{maps:merge(Process, Updates), Updates}
end;
update_process(#{status := <<"error">>, process_id := ProcessId} = Process, Intent) ->
%% process repaired (transition from error to running)
StatusChangedAt = erlang:system_time(second),
Expand All @@ -583,6 +581,26 @@ update_process(#{status := <<"error">>, process_id := ProcessId} = Process, Inte
corrupted_by => undefined
},
update_process_from_intent(NewProcess, ProcessUpdates, Intent);
update_process(#{status := <<"init">>, process_id := ProcessId} = Process, Intent) ->
%% transition from init to running
StatusChangedAt = erlang:system_time(second),
ProcessUpdates = #{
process_id => ProcessId,
status => <<"running">>,
previous_status => <<"init">>,
status_changed_at => StatusChangedAt
},
NewProcess = maps:merge(Process, ProcessUpdates),
update_process_from_intent(NewProcess, ProcessUpdates, Intent);
update_process(#{previous_status := <<"init">>, status := <<"running">>, process_id := ProcessId} = Process, Intent) ->
%% first transition from running to running, need to update previous_status
ProcessUpdates = #{
process_id => ProcessId,
status => <<"running">>,
previous_status => <<"running">>
},
NewProcess = maps:merge(Process, ProcessUpdates),
update_process_from_intent(NewProcess, ProcessUpdates, Intent);
update_process(#{status := <<"running">>, process_id := ProcessId} = Process, Intent) ->
%% normal work
update_process_from_intent(Process, #{process_id => ProcessId}, Intent).
Expand Down
4 changes: 2 additions & 2 deletions src/progressor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ await_task_result(StorageOpts, NsId, KeyOrId, StepTimeout, Duration) ->
end.

do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange} = Req) ->
case prg_storage:get_process_with_initialization(StorageOpts, NsId, Id, HistoryRange) of
{ok, #{initialization := _TaskId}} ->
case prg_storage:get_process(external, StorageOpts, NsId, Id, HistoryRange) of
{ok, #{status := <<"init">>}} ->
%% init task not finished, await and retry
Timeout = application:get_env(progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT),
timer:sleep(Timeout),
Expand Down
41 changes: 0 additions & 41 deletions src/storage/postgres/prg_pg_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
-export([prepare_repair/4]).
-export([put_process_data/4]).
-export([process_trace/3]).
-export([get_process_with_initialization/4]).
-export([repair_process/3]).

%% scan functions
Expand Down Expand Up @@ -139,32 +138,6 @@ get_process(Recipient, PgOpts, NsId, ProcessId, HistoryRange) ->
),
parse_process_info(RawResult, HistoryRange).

-spec get_process_with_initialization(pg_opts(), namespace_id(), id(), history_range()) ->
{ok, process()} | {error, _Reason}.
get_process_with_initialization(PgOpts, NsId, ProcessId, HistoryRange) ->
Pool = get_pool(external, PgOpts),
#{
processes := ProcessesTable,
running := RunningTable,
events := EventsTable
} = prg_pg_utils:tables(NsId),
RangeCondition = create_range_condition(HistoryRange),
RawResult = epg_pool:transaction(
Pool,
fun(Connection) ->
case do_get_process_with_initialization(Connection, ProcessesTable, RunningTable, ProcessId) of
{ok, _, []} ->
{error, <<"process not found">>};
{ok, ColumnsPr, RowsPr} ->
{ok, _, _} =
{ok, ColumnstEv, RowsEv} = do_get_events(Connection, EventsTable, ProcessId, RangeCondition),
LastEventId = get_last_event_id(Connection, EventsTable, ProcessId),
{ok, {ColumnsPr, RowsPr}, {ColumnstEv, RowsEv}, LastEventId}
end
end
),
parse_process_info(RawResult, HistoryRange).

-spec repair_process(pg_opts(), namespace_id(), id()) -> ok | no_return().
repair_process(PgOpts, NsId, ProcessId) ->
Pool = get_pool(external, PgOpts),
Expand Down Expand Up @@ -752,20 +725,6 @@ do_get_process(Connection, Table, ProcessId) ->
[ProcessId]
).

do_get_process_with_initialization(Connection, ProcessesTable, RunningTable, ProcessId) ->
SQL =
"SELECT"
" pr.*, rt.task_id as initialization FROM " ++ ProcessesTable ++
" pr "
" LEFT JOIN " ++ RunningTable ++
" rt ON pr.process_id = rt.process_id AND rt.task_type = 'init'"
" WHERE pr.process_id = $1",
epg_pool:query(
Connection,
SQL,
[ProcessId]
).

parse_process_info(RawResult, HistoryRange) ->
case RawResult of
{error, _} = Error ->
Expand Down
18 changes: 18 additions & 0 deletions src/storage/postgres/prg_pg_migration.erl
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,24 @@ db_init(#{pool := Pool}, NsId) ->
)
end,

%% MIGRATION 3
%% Expand prosess_status enumeration
{ok, _, [{IsInitStatusExists}]} = epg_pool:query(
Connection,
"select exists (SELECT 1 FROM pg_enum WHERE "
" enumtypid = 'process_status'::regtype and enumlabel = 'init')"
),
_ =
case IsInitStatusExists of
true ->
ok;
false ->
{ok, _, _} = epg_pool:query(
Connection,
"ALTER TYPE process_status ADD VALUE 'init'"
)
end,

%%% END
{ok, [], []}
end
Expand Down
4 changes: 2 additions & 2 deletions test/prg_base_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
-export([task_race_condition_hack_test/1]).

-define(NS(C), proplists:get_value(ns_id, C, 'default/default')).
-define(AWAIT_TIMEOUT(C), proplists:get_value(repl_timeout, C, 0)).
-define(AWAIT_TIMEOUT(C), proplists:get_value(repl_timeout, C, 5)).

init_per_suite(Config) ->
Config.
Expand Down Expand Up @@ -1410,12 +1410,12 @@ mock_processor(put_process_with_timeout_test = TestCase) ->
mock_processor(task_race_condition_hack_test = TestCase) ->
Self = self(),
MockProcessor = fun({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Self ! 1,
timer:sleep(3000),
Result = #{
events => [event(1)],
aux_state => <<"aux_state">>
},
Self ! 1,
{ok, Result}
end,
mock_processor(TestCase, MockProcessor).
Expand Down
Loading