Skip to content

Commit fb5062c

Browse files
ttt161ttt161
andauthored
add internal timers for schedule (#40)
* add internal timers for schedule * cleanup --------- Co-authored-by: ttt161 <losto@nix>
1 parent fdebffd commit fb5062c

10 files changed

Lines changed: 326 additions & 51 deletions

File tree

include/progressor.hrl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
worker_pool_size => pos_integer(),
8989
process_step_timeout => timeout_sec(),
9090
task_scan_timeout => timeout_sec(),
91+
call_scan_timeout => timeout_sec(),
9192
last_timer_repair => boolean()
9293
}.
9394

@@ -196,12 +197,15 @@
196197
-type timestamp_ms() :: non_neg_integer().
197198
-type timestamp_sec() :: non_neg_integer().
198199
-type timeout_sec() :: non_neg_integer().
200+
-type timeout_ms() :: non_neg_integer().
199201

200202
%%%
201203
%%% Constants
202204
%%%
203205
-define(DEFAULT_STEP_TIMEOUT_SEC, 60).
204206

207+
-define(DEFAULT_CALL_SCAN_TIMEOUT_SEC, 3).
208+
205209
-define(DEFAULT_RETRY_POLICY, #{
206210
%% second
207211
initial_timeout => 5,

src/prg_scanner.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,15 @@ start_link({NsId, _NsOpts} = NS) ->
3030
gen_server:start_link({local, RegName}, ?MODULE, NS, []).
3131

3232
init(
33-
{NsId, #{task_scan_timeout := RescanTimeoutSec, process_step_timeout := StepTimeoutSec} = Opts}
33+
{NsId,
34+
#{
35+
task_scan_timeout := RescanTimeoutSec,
36+
call_scan_timeout := CallRescanTimeoutSec,
37+
process_step_timeout := StepTimeoutSec
38+
} = Opts}
3439
) ->
3540
RescanTimeoutMs = RescanTimeoutSec * 1000,
41+
CallRescanTimeoutMs = CallRescanTimeoutSec * 1000,
3642
StepTimeoutMs = StepTimeoutSec * 1000,
3743
State = #prg_scanner_state{
3844
ns_id = NsId,
@@ -44,7 +50,7 @@ init(
4450
case maps:get(worker_pool_size, Opts) > 0 of
4551
true ->
4652
_ = start_rescan_timers(RescanTimeoutMs),
47-
_ = start_rescan_calls((RescanTimeoutMs div 3) + 100),
53+
_ = start_rescan_calls(CallRescanTimeoutMs),
4854
_ = start_zombie_collector(StepTimeoutMs);
4955
false ->
5056
skip_scanning

src/prg_scheduler.erl

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
-export([capture_worker/2]).
2222
-export([return_worker/3]).
2323
-export([release_worker/3]).
24+
-export([schedule_task/4]).
25+
26+
%% Deprecated
2427
-export([continuation_task/3]).
2528

2629
-record(prg_scheduler_state, {ns_id, ns_opts, ready, free_workers, owners, wrk_monitors}).
@@ -39,6 +42,7 @@ pop_task(NsId, Worker) ->
3942
RegName = prg_utils:registered_name(NsId, "_scheduler"),
4043
gen_server:call(RegName, {pop_task, Worker}, infinity).
4144

45+
%% Deprecated
4246
-spec continuation_task(namespace_id(), pid(), task()) -> {task_header(), task()} | ok.
4347
continuation_task(NsId, Worker, Task) ->
4448
RegName = prg_utils:registered_name(NsId, "_scheduler"),
@@ -54,16 +58,23 @@ capture_worker(NsId, Owner) ->
5458
RegName = prg_utils:registered_name(NsId, "_scheduler"),
5559
gen_server:call(RegName, {capture_worker, Owner}, infinity).
5660

61+
%% worker is alive and free
5762
-spec return_worker(namespace_id(), pid(), pid()) -> ok.
5863
return_worker(NsId, Owner, Worker) ->
5964
RegName = prg_utils:registered_name(NsId, "_scheduler"),
6065
gen_server:cast(RegName, {return_worker, Owner, Worker}).
6166

67+
%% worker is alive and busy (processes task)
6268
-spec release_worker(namespace_id(), pid(), pid()) -> ok.
6369
release_worker(NsId, Owner, Pid) ->
6470
RegName = prg_utils:registered_name(NsId, "_scheduler"),
6571
gen_server:cast(RegName, {release_worker, Owner, Pid}).
6672

73+
-spec schedule_task(namespace_id(), id(), task_id(), timeout_ms()) -> ok.
74+
schedule_task(NsId, ProcessId, TaskId, Timeout) ->
75+
RegName = prg_utils:registered_name(NsId, "_scheduler"),
76+
gen_server:cast(RegName, {schedule_task, ProcessId, TaskId, Timeout}).
77+
6778
%%%===================================================================
6879
%%% Spawning and gen_server implementation
6980
%%%===================================================================
@@ -156,6 +167,12 @@ handle_cast(
156167
State#prg_scheduler_state{owners = maps:without([Owner], Owners)}
157168
end,
158169
{noreply, NewState};
170+
handle_cast(
171+
{schedule_task, ProcessId, TaskId, Timeout},
172+
#prg_scheduler_state{} = State
173+
) ->
174+
_TRef = erlang:start_timer(Timeout, self(), {process_scheduled_task, ProcessId, TaskId}),
175+
{noreply, State};
159176
handle_cast(_Request, #prg_scheduler_state{} = State) ->
160177
{noreply, State}.
161178

@@ -173,6 +190,19 @@ handle_info(
173190
MRef = erlang:monitor(process, NewWrk),
174191
NewWrkMonitors = maps:put(NewWrk, MRef, maps:without([Pid], WrkMonitors)),
175192
{noreply, State#prg_scheduler_state{wrk_monitors = NewWrkMonitors}};
193+
handle_info(
194+
{timeout, _TRef, {process_scheduled_task, ProcessId, TaskId}},
195+
#prg_scheduler_state{free_workers = FreeWorkers} = State
196+
) ->
197+
NewState =
198+
case queue:out(FreeWorkers) of
199+
{{value, Worker}, NewWorkers} ->
200+
ok = prg_worker:process_scheduled_task(Worker, ProcessId, TaskId),
201+
State#prg_scheduler_state{free_workers = NewWorkers};
202+
{empty, _} ->
203+
State
204+
end,
205+
{noreply, NewState};
176206
handle_info(_Info, #prg_scheduler_state{} = State) ->
177207
{noreply, State}.
178208

src/prg_storage.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
-export([complete_and_unlock/5]).
2626
-export([complete_and_error/4]).
2727
-export([remove_process/3]).
28+
-export([capture_task/3]).
2829

2930
%% shared functions
3031
-export([get_task/3]).
@@ -147,6 +148,10 @@ complete_and_unlock(
147148
remove_process(#{client := Handler, options := HandlerOpts}, NsId, ProcessId) ->
148149
Handler:remove_process(HandlerOpts, NsId, ProcessId).
149150

151+
-spec capture_task(storage_opts(), namespace_id(), task_id()) -> [task()].
152+
capture_task(#{client := Handler, options := HandlerOpts}, NsId, TaskId) ->
153+
Handler:capture_task(HandlerOpts, NsId, TaskId).
154+
150155
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
151156
%% Shared functions (recipient required)
152157
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

src/prg_utils.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ make_ns_opts(NsId, NsOpts) ->
3535
worker_pool_size => ?DEFAULT_WORKER_POOL_SIZE,
3636
process_step_timeout => ?DEFAULT_STEP_TIMEOUT_SEC,
3737
task_scan_timeout => (?DEFAULT_STEP_TIMEOUT_SEC div 2) + 1,
38+
call_scan_timeout => ?DEFAULT_CALL_SCAN_TIMEOUT_SEC,
3839
last_timer_repair => false
3940
},
4041
ConfigDefaults = application:get_env(progressor, defaults, #{}),

src/prg_worker.erl

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
-export([process_task/3]).
1919
-export([continuation_task/3]).
2020
-export([next_task/1]).
21+
-export([process_scheduled_task/3]).
2122

2223
-record(prg_worker_state, {ns_id, ns_opts, process, sidecar_pid}).
2324

2425
-define(DEFAULT_RANGE, #{direction => forward}).
26+
-define(CAPTURE_DEFENSE_INTERVAL_MS, 100).
2527

2628
%%%
2729
%%% API
@@ -39,6 +41,10 @@ continuation_task(Worker, TaskHeader, Task) ->
3941
next_task(Worker) ->
4042
gen_server:cast(Worker, next_task).
4143

44+
-spec process_scheduled_task(pid(), id(), task_id()) -> ok.
45+
process_scheduled_task(Worker, ProcessId, TaskId) ->
46+
gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId}).
47+
4248
%%%===================================================================
4349
%%% Spawning and gen_server implementation
4450
%%%===================================================================
@@ -90,6 +96,32 @@ handle_cast(
9096
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
9197
NewState = do_process_task(TaskHeader, Task, Deadline, State),
9298
{noreply, NewState};
99+
handle_cast(
100+
{process_scheduled_task, ProcessId, TaskId},
101+
#prg_worker_state{
102+
ns_id = NsId,
103+
ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts,
104+
sidecar_pid = Pid
105+
} = State
106+
) ->
107+
try prg_storage:capture_task(StorageOpts, NsId, TaskId) of
108+
[] ->
109+
%% task cancelled, blocked, already running or finished
110+
ok = next_task(self()),
111+
{noreply, State};
112+
[#{status := <<"running">>} = Task] ->
113+
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
114+
HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}),
115+
{ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange),
116+
TaskHeader = create_header(Task),
117+
NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}),
118+
{noreply, NewState}
119+
catch
120+
Class:Term:Stacktrace ->
121+
logger:error("process ~p. task capturing exception: ~p", [ProcessId, [Class, Term, Stacktrace]]),
122+
ok = next_task(self()),
123+
{noreply, State}
124+
end;
93125
handle_cast(next_task, #prg_worker_state{sidecar_pid = CurrentPid}) ->
94126
%% kill sidecar and restart to clear memory
95127
true = erlang:unlink(CurrentPid),
@@ -235,7 +267,9 @@ success_and_continue(Intent, TaskHeader, Task, Deadline, State) ->
235267
),
236268
_ = maybe_reply(TaskHeader, Response),
237269
case SaveResult of
238-
{ok, []} ->
270+
{ok, [#{status := <<"waiting">>, task_id := NextTaskId, scheduled_time := Ts} | _]} ->
271+
RunAfterMs = (Ts - Now) * 1000 - ?CAPTURE_DEFENSE_INTERVAL_MS,
272+
ok = prg_scheduler:schedule_task(NsId, ProcessId, NextTaskId, RunAfterMs),
239273
ok = next_task(self()),
240274
State#prg_worker_state{process = undefined};
241275
{ok, [ContinuationTask | _]} ->
@@ -383,6 +417,7 @@ success_and_unlock(Intent, TaskHeader, Task, Deadline, State) ->
383417
process = #{process_id := ProcessId, status := OldStatus} = Process,
384418
sidecar_pid = Pid
385419
} = State,
420+
Now = erlang:system_time(second),
386421
{#{status := NewStatus} = ProcessUpdated, Updates} = update_process(Process, Intent),
387422
ok = prg_worker_sidecar:lifecycle_sink(
388423
Pid, Deadline, NsOpts, lifecycle_event(TaskHeader, OldStatus, NewStatus), ProcessId
@@ -409,7 +444,12 @@ success_and_unlock(Intent, TaskHeader, Task, Deadline, State) ->
409444
{ok, []} ->
410445
ok = next_task(self()),
411446
State#prg_worker_state{process = undefined};
412-
{ok, [ContinuationTask | _]} ->
447+
{ok, [#{status := <<"waiting">>, task_id := NextTaskId, scheduled_time := Ts} | _]} ->
448+
RunAfterMs = (Ts - Now) * 1000 - ?CAPTURE_DEFENSE_INTERVAL_MS,
449+
ok = prg_scheduler:schedule_task(NsId, ProcessId, NextTaskId, RunAfterMs),
450+
ok = next_task(self()),
451+
State#prg_worker_state{process = undefined};
452+
{ok, [#{status := <<"running">>} = ContinuationTask | _]} ->
413453
NewHistory = maps:get(history, Process) ++ Events,
414454
ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask),
415455
State#prg_worker_state{
@@ -475,7 +515,17 @@ error_and_retry({error, Reason} = Response, TaskHeader, Task, Deadline, State) -
475515
);
476516
NewTask ->
477517
Updates = #{process_id => ProcessId},
478-
{ok, _} = prg_worker_sidecar:complete_and_continue(
518+
%% prg_storage guarantees that when saving a task with the error status,
519+
%% all deferred tasks of all types will be completed with the canceled status,
520+
%% so calling complete_and_continue is guaranteed to return the retrieval task,
521+
%% and not any other deferred task
522+
{ok, [
523+
#{
524+
status := <<"waiting">>,
525+
task_id := NextTaskId,
526+
scheduled_time := Ts
527+
}
528+
]} = prg_worker_sidecar:complete_and_continue(
479529
Pid,
480530
Deadline,
481531
StorageOpts,
@@ -484,7 +534,10 @@ error_and_retry({error, Reason} = Response, TaskHeader, Task, Deadline, State) -
484534
Updates,
485535
[],
486536
NewTask
487-
)
537+
),
538+
Now = erlang:system_time(second),
539+
RunAfterMs = (Ts - Now) * 1000 - ?CAPTURE_DEFENSE_INTERVAL_MS,
540+
ok = prg_scheduler:schedule_task(NsId, ProcessId, NextTaskId, RunAfterMs)
488541
end,
489542
ok = next_task(self()),
490543
State#prg_worker_state{process = undefined}.

src/progressor_app.erl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,56 +51,56 @@ start_namespace(NsID, NsOpts) ->
5151
%% internal functions
5252

5353
create_metrics() ->
54-
_ = prometheus_histogram:new([
54+
_ = prometheus_histogram:declare([
5555
{name, progressor_calls_scanning_duration_ms},
5656
{help, "Calls (call, repair) scanning durations in millisecond"},
5757
{buckets, [50, 150, 300, 500, 750, 1000]},
5858
{labels, [prg_namespace]}
5959
]),
6060

61-
_ = prometheus_histogram:new([
61+
_ = prometheus_histogram:declare([
6262
{name, progressor_timers_scanning_duration_ms},
6363
{help, "Timers (timeout, remove) scanning durations in millisecond"},
6464
{buckets, [50, 150, 300, 500, 750, 1000]},
6565
{labels, [prg_namespace]}
6666
]),
6767

68-
_ = prometheus_histogram:new([
68+
_ = prometheus_histogram:declare([
6969
{name, progressor_zombie_collection_duration_ms},
7070
{help, "Zombie tasks collecting durations in millisecond"},
7171
{buckets, [50, 150, 300, 500, 750, 1000]},
7272
{labels, [prg_namespace]}
7373
]),
7474

75-
_ = prometheus_histogram:new([
75+
_ = prometheus_histogram:declare([
7676
{name, progressor_request_preparing_duration_ms},
7777
{help, "Preparing request (init, call, repair) durations in millisecond"},
7878
{buckets, [50, 150, 300, 500, 750, 1000]},
7979
{labels, [prg_namespace, task_type]}
8080
]),
8181

82-
_ = prometheus_histogram:new([
82+
_ = prometheus_histogram:declare([
8383
{name, progressor_task_processing_duration_ms},
8484
{help, "Task processing durations in millisecond"},
8585
{buckets, [50, 150, 300, 500, 750, 1000]},
8686
{labels, [prg_namespace, task_type]}
8787
]),
8888

89-
_ = prometheus_histogram:new([
89+
_ = prometheus_histogram:declare([
9090
{name, progressor_task_completion_duration_ms},
9191
{help, "Task completion durations in millisecond"},
9292
{buckets, [50, 150, 300, 500, 750, 1000]},
9393
{labels, [prg_namespace, completion_type]}
9494
]),
9595

96-
_ = prometheus_histogram:new([
96+
_ = prometheus_histogram:declare([
9797
{name, progressor_process_removing_duration_ms},
9898
{help, "Task completion durations in millisecond"},
9999
{buckets, [50, 150, 300, 500, 750, 1000]},
100100
{labels, [prg_namespace]}
101101
]),
102102

103-
_ = prometheus_histogram:new([
103+
_ = prometheus_histogram:declare([
104104
{name, progressor_notification_duration_ms},
105105
{help, "Notification durations in millisecond"},
106106
{buckets, [10, 50, 150, 300, 500, 1000]},

0 commit comments

Comments
 (0)