Skip to content

Commit 0247626

Browse files
ttt161ttt161
andauthored
fix simple repair on devel (#38)
Co-authored-by: ttt161 <losto@nix>
1 parent ed7e396 commit 0247626

7 files changed

Lines changed: 133 additions & 23 deletions

File tree

elvis.config

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
{elvis_style, no_spec_with_records},
2828
{elvis_style, no_debug_call, #{}},
2929
{elvis_style, export_used_types, disable},
30+
{elvis_style, god_modules, disable},
3031
%% FIXME Maybe refactor code blocks
3132
{elvis_style, dont_repeat_yourself, #{
3233
ignore => [

include/progressor.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@
130130

131131
-type history_range() :: #{
132132
offset => non_neg_integer(),
133-
limit => pos_integer(),
133+
limit => non_neg_integer(),
134134
direction => forward | backward
135135
}.
136136

src/prg_storage.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
-export([put_process_data/4]).
1313
-export([process_trace/3]).
1414
-export([get_process_with_initialization/4]).
15+
-export([repair_process/3]).
1516

1617
%% scan functions
1718
-export([search_timers/4]).
@@ -90,6 +91,10 @@ prepare_call(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, Task
9091
prepare_repair(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, RepairTask) ->
9192
Handler:prepare_repair(HandlerOpts, NsId, ProcessId, RepairTask).
9293

94+
-spec repair_process(storage_opts(), namespace_id(), id()) -> ok | no_return().
95+
repair_process(#{client := Handler, options := HandlerOpts}, NsId, ProcessId) ->
96+
Handler:repair_process(HandlerOpts, NsId, ProcessId).
97+
9398
%%%%%%%%%%%%%%%%%
9499
%% Scan functions
95100
%%%%%%%%%%%%%%%%%

src/prg_worker.erl

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,15 @@ handle_result(
171171
#prg_worker_state{
172172
ns_id = NsId,
173173
ns_opts = #{storage := StorageOpts} = NsOpts,
174-
process = #{process_id := ProcessId} = Process,
174+
process = #{process_id := ProcessId, status := OldStatus} = Process,
175175
sidecar_pid = Pid
176176
} = State
177177
) ->
178178
Now = erlang:system_time(second),
179-
ProcessUpdated = update_process(
180-
maps:without([detail, corrupted_by], Process#{status => <<"running">>}), Result
181-
),
179+
#{status := NewStatus} =
180+
ProcessUpdated = update_process(
181+
maps:without([detail, corrupted_by], Process#{status => <<"running">>}), Result
182+
),
182183
Response = response(maps:get(response, Result, undefined)),
183184
TaskResult = #{
184185
task_id => TaskId,
@@ -196,7 +197,7 @@ handle_result(
196197
attempts_count => 0
197198
},
198199
ok = prg_worker_sidecar:lifecycle_sink(
199-
Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId
200+
Pid, Deadline, NsOpts, lifecycle_event(TaskHeader, OldStatus, NewStatus), ProcessId
200201
),
201202
ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events),
202203
SaveResult = prg_worker_sidecar:complete_and_continue(
@@ -249,17 +250,18 @@ handle_result(
249250
#prg_worker_state{
250251
ns_id = NsId,
251252
ns_opts = #{storage := StorageOpts} = NsOpts,
252-
process = #{process_id := ProcessId} = Process,
253+
process = #{process_id := ProcessId, status := OldStatus} = Process,
253254
sidecar_pid = Pid
254255
} = State
255256
) ->
257+
#{status := NewStatus} =
258+
ProcessUpdated = update_process(
259+
maps:without([detail, corrupted_by], Process#{status => <<"running">>}), Result
260+
),
256261
ok = prg_worker_sidecar:lifecycle_sink(
257-
Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId
262+
Pid, Deadline, NsOpts, lifecycle_event(TaskHeader, OldStatus, NewStatus), ProcessId
258263
),
259264
ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events),
260-
ProcessUpdated = update_process(
261-
maps:without([detail, corrupted_by], Process#{status => <<"running">>}), Result
262-
),
263265
Response = response(maps:get(response, Result, undefined)),
264266
TaskResult = #{
265267
task_id => TaskId,
@@ -303,7 +305,7 @@ handle_result(
303305
) ->
304306
Now = erlang:system_time(second),
305307
ok = prg_worker_sidecar:lifecycle_sink(
306-
Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId
308+
Pid, Deadline, NsOpts, repair, ProcessId
307309
),
308310
ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events),
309311
ProcessUpdated = update_process(
@@ -368,17 +370,18 @@ handle_result(
368370
#prg_worker_state{
369371
ns_id = NsId,
370372
ns_opts = #{storage := StorageOpts} = NsOpts,
371-
process = #{process_id := ProcessId} = Process,
373+
process = #{process_id := ProcessId, status := OldStatus} = Process,
372374
sidecar_pid = Pid
373375
} = State
374376
) ->
377+
#{status := NewStatus} =
378+
ProcessUpdated = update_process(
379+
maps:without([detail, corrupted_by], Process#{status => <<"running">>}), Result
380+
),
375381
ok = prg_worker_sidecar:lifecycle_sink(
376-
Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId
382+
Pid, Deadline, NsOpts, lifecycle_event(TaskHeader, OldStatus, NewStatus), ProcessId
377383
),
378384
ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events),
379-
ProcessUpdated = update_process(
380-
maps:without([detail, corrupted_by], Process#{status => <<"running">>}), Result
381-
),
382385
Response = response(maps:get(response, Result, undefined)),
383386
TaskResult = #{
384387
task_id => TaskId,
@@ -601,3 +604,10 @@ last_event_id([]) ->
601604
last_event_id(History) ->
602605
[#{event_id := Id} | _] = lists:reverse(History),
603606
Id.
607+
608+
lifecycle_event({timeout, _}, <<"error">>, <<"running">>) ->
609+
repair;
610+
lifecycle_event({timeout, _}, _, _) ->
611+
timeout;
612+
lifecycle_event({TaskType, _}, _, _) ->
613+
TaskType.

src/progressor.erl

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,11 @@ simple_repair(Req) ->
8888
[
8989
fun add_ns_opts/1,
9090
fun check_idempotency/1,
91-
fun(Data) -> check_process_status(Data, <<"error">>) end,
92-
fun add_task/1,
91+
fun check_process_continuation/1,
9392
fun(Data) -> prepare_postponed(fun prg_storage:prepare_call/4, Data) end,
94-
fun(_Data) -> {ok, ok} end
93+
fun do_simple_repair/1
9594
],
96-
Req#{type => timeout}
95+
Req
9796
).
9897

9998
-spec get(request()) -> {ok, _Result} | {error, _Reason}.
@@ -205,6 +204,33 @@ check_process_status(
205204
{error, _} = Error -> Error
206205
end.
207206

207+
check_process_continuation(
208+
#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId} = Opts
209+
) ->
210+
case prg_storage:get_process(external, StorageOpts, NsId, Id, #{limit => 0}) of
211+
{ok, #{status := <<"running">>}} ->
212+
{error, <<"process is running">>};
213+
{ok, #{status := <<"error">>, corrupted_by := TaskId}} when is_integer(TaskId) ->
214+
case prg_storage:get_task(external, StorageOpts, NsId, TaskId) of
215+
{ok, #{task_type := Type}} when Type =:= <<"timeout">>; Type =:= <<"remove">> ->
216+
add_task(Opts#{type => Type});
217+
{ok, _} ->
218+
Opts
219+
end;
220+
{ok, #{status := <<"error">>}} ->
221+
Opts;
222+
{error, _} = Error ->
223+
Error
224+
end.
225+
226+
do_simple_repair(#{task := _T}) ->
227+
%% process will repaired via timeout task
228+
{ok, ok};
229+
do_simple_repair(#{ns_opts := #{storage := StorageOpts} = NsOpts, id := Id, ns := NsId}) ->
230+
ok = prg_storage:repair_process(StorageOpts, NsId, Id),
231+
ok = prg_notifier:lifecycle_sink(NsOpts, repair, Id),
232+
{ok, ok}.
233+
208234
prepare(
209235
Fun,
210236
#{ns_opts := #{storage := StorageOpts} = NsOpts, ns := NsId, id := ProcessId, task := Task} =
@@ -251,7 +277,10 @@ prepare_postponed(
251277
Req#{task => Task#{task_id => TaskId}};
252278
{error, _} = Error ->
253279
Error
254-
end.
280+
end;
281+
prepare_postponed(_Fun, Req) ->
282+
%% Req without task, skip this step
283+
Req.
255284

256285
get_task_result(#{
257286
ns_opts := #{storage := StorageOpts} = NsOpts, ns := NsId, idempotency_key := IdempotencyKey
@@ -377,7 +406,9 @@ convert_task_type(notify) ->
377406
convert_task_type(timeout) ->
378407
<<"timeout">>;
379408
convert_task_type(repair) ->
380-
<<"repair">>.
409+
<<"repair">>;
410+
convert_task_type(Type) when is_binary(Type) ->
411+
Type.
381412

382413
maybe_add_idempotency(Task, undefined) ->
383414
Task;

src/storage/postgres/prg_pg_backend.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
-export([put_process_data/4]).
1616
-export([process_trace/3]).
1717
-export([get_process_with_initialization/4]).
18+
-export([repair_process/3]).
1819

1920
%% scan functions
2021
-export([collect_zombies/3]).
@@ -163,6 +164,19 @@ get_process_with_initialization(PgOpts, NsId, ProcessId, HistoryRange) ->
163164
),
164165
parse_process_info(RawResult, HistoryRange).
165166

167+
-spec repair_process(pg_opts(), namespace_id(), id()) -> ok | no_return().
168+
repair_process(PgOpts, NsId, ProcessId) ->
169+
Pool = get_pool(external, PgOpts),
170+
#{processes := ProcessesTable} = prg_pg_utils:tables(NsId),
171+
{ok, 1} = epg_pool:query(
172+
Pool,
173+
"UPDATE " ++ ProcessesTable ++
174+
" SET status = 'running', corrupted_by = null, detail = null "
175+
" WHERE process_id = $1",
176+
[ProcessId]
177+
),
178+
ok.
179+
166180
-spec put_process_data(
167181
pg_opts(),
168182
namespace_id(),

test/prg_base_SUITE.erl

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
-export([postponed_call_to_suspended_process_test/1]).
2323
-export([multiple_calls_test/1]).
2424
-export([simple_repair_after_non_retriable_error_test/1]).
25+
-export([simple_repair_after_call_error_test/1]).
2526
-export([repair_after_non_retriable_error_test/1]).
2627
-export([error_after_max_retries_test/1]).
2728
-export([repair_after_call_error_test/1]).
@@ -69,6 +70,7 @@ groups() ->
6970
postponed_call_to_suspended_process_test,
7071
multiple_calls_test,
7172
simple_repair_after_non_retriable_error_test,
73+
simple_repair_after_call_error_test,
7274
repair_after_non_retriable_error_test,
7375
error_after_max_retries_test,
7476
repair_after_call_error_test,
@@ -487,6 +489,36 @@ simple_repair_after_non_retriable_error_test(C) ->
487489
unmock_processor(),
488490
ok.
489491

492+
-spec simple_repair_after_call_error_test(_) -> _.
493+
simple_repair_after_call_error_test(C) ->
494+
%% steps:
495+
%% 1. init -> [], undefined
496+
%% 2. call -> {error, retry_this}
497+
%% 3. simple_repair
498+
_ = mock_processor(simple_repair_after_call_error_test),
499+
Id = gen_id(),
500+
{ok, ok} = progressor:init(#{ns => ?NS(C), id => Id, args => <<"init_args">>}),
501+
{error, retry_this} = progressor:call(#{ns => ?NS(C), id => Id, args => <<"call_args">>}),
502+
2 = expect_steps_counter(2),
503+
timer:sleep(?AWAIT_TIMEOUT(C)),
504+
{ok, #{
505+
detail := <<"retry_this">>,
506+
metadata := #{<<"k">> := <<"v">>},
507+
history := [],
508+
process_id := Id,
509+
status := <<"error">>
510+
}} = progressor:get(#{ns => ?NS(C), id => Id}),
511+
{ok, ok} = progressor:simple_repair(#{ns => ?NS(C), id => Id}),
512+
timer:sleep(?AWAIT_TIMEOUT(C)),
513+
{ok, #{
514+
metadata := #{<<"k">> := <<"v">>},
515+
history := [],
516+
process_id := Id,
517+
status := <<"running">>
518+
}} = progressor:get(#{ns => ?NS(C), id => Id}),
519+
unmock_processor(),
520+
ok.
521+
490522
-spec repair_after_non_retriable_error_test(_) -> _.
491523
repair_after_non_retriable_error_test(C) ->
492524
%% steps:
@@ -1144,6 +1176,23 @@ mock_processor(simple_repair_after_non_retriable_error_test = TestCase) ->
11441176
end,
11451177
mock_processor(TestCase, MockProcessor);
11461178
%%
1179+
mock_processor(simple_repair_after_call_error_test = TestCase) ->
1180+
Self = self(),
1181+
MockProcessor = fun
1182+
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
1183+
Result = #{
1184+
metadata => #{<<"k">> => <<"v">>},
1185+
events => []
1186+
},
1187+
Self ! 1,
1188+
{ok, Result};
1189+
({call, <<"call_args">>, #{history := []} = _Process}, _Opts, _Ctx) ->
1190+
Self ! 2,
1191+
%% retriable error for call must be ignore and process set error status
1192+
{error, retry_this}
1193+
end,
1194+
mock_processor(TestCase, MockProcessor);
1195+
%%
11471196
mock_processor(repair_after_non_retriable_error_test = TestCase) ->
11481197
Self = self(),
11491198
MockProcessor = fun

0 commit comments

Comments
 (0)