Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ DEV_IMAGE_ID = $(file < .image.dev)

DOCKER ?= docker
DOCKERCOMPOSE ?= docker compose
DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml
DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml -f compose.tracing.yaml
REBAR ?= rebar3
TEST_CONTAINER_NAME ?= testrunner

Expand Down
33 changes: 33 additions & 0 deletions compose.tracing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
services:

postgres:
environment: &otlp_enabled
OTEL_TRACES_EXPORTER: otlp
OTEL_TRACES_SAMPLER: parentbased_always_off
OTEL_EXPORTER_OTLP_PROTOCOL: http_protobuf
OTEL_EXPORTER_OTLP_ENDPOINT: http://jaeger:4318

testrunner:
environment:
<<: *otlp_enabled
OTEL_SERVICE_NAME: progressor_testrunner
OTEL_TRACES_SAMPLER: parentbased_always_on
depends_on:
jaeger:
condition: service_healthy

jaeger:
image: jaegertracing/all-in-one:1.47
environment:
- COLLECTOR_OTLP_ENABLED=true
healthcheck:
test: "/go/bin/all-in-one-linux status"
interval: 2s
timeout: 1s
retries: 20
ports:
- 4317:4317 # OTLP gRPC receiver
- 4318:4318 # OTLP http receiver
- 5778:5778
- 14250:14250
- 16686:16686
26 changes: 26 additions & 0 deletions include/otel.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-ifndef(__progressor_otel__).
-define(__progressor_otel__, ok).

-define(current_otel_ctx, otel_ctx:get_current()).

-define(current_span_ctx, otel_tracer:current_span_ctx(?current_otel_ctx)).

-define(span_exception(Class, Error, Stacktrace),
otel_span:record_exception(?current_span_ctx, Class, Error, Stacktrace, #{})
).
-define(span_exception(Class, Error, Message, Stacktrace),
otel_span:record_exception(?current_span_ctx, Class, Error, Message, Stacktrace, #{})
).

-define(span_event(EventName), otel_span:add_event(?current_span_ctx, EventName, #{})).

-define(span_attributes(Attributes), otel_span:set_attributes(?current_span_ctx, Attributes)).

-define(tracer, opentelemetry:get_application_tracer(?MODULE)).

-define(with_span(OtelCtx, SpanName, Fun),
otel_tracer:with_span(OtelCtx, ?tracer, SpanName, #{kind => internal}, fun(_SpanCtx) -> Fun() end)
).
-define(with_span(SpanName, Fun), ?with_span(?current_otel_ctx, SpanName, Fun)).

-endif.
7 changes: 5 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
{recon, "2.5.6"},
{thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}},
{mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}},
{epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}}
{epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}},
{opentelemetry_api, "1.4.0"}
]}.

{xref_checks, [
Expand Down Expand Up @@ -38,7 +39,9 @@
{profiles, [
{test, [
{deps, [
{meck, "0.9.2"}
{meck, "0.9.2"},
{opentelemetry, "1.5.0"},
{opentelemetry_exporter, "1.8.0"}
]},
{dialyzer, [{plt_extra_apps, [eunit, common_test, runtime_tools, meck]}]}
]}
Expand Down
5 changes: 4 additions & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
{<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},1},
{<<"mg_proto">>,
{git,"https://github.com/valitydev/machinegun-proto.git",
{ref,"3decc8f8b13c9cd1701deab47781aacddd7dbc92"}},
{ref,"cc2c27c30d30dc34c0c56fc7c7e96326d6bd6a14"}},
0},
{<<"opentelemetry_api">>,{pkg,<<"opentelemetry_api">>,<<"1.4.0">>},0},
{<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0},
{<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1},
{<<"recon">>,{pkg,<<"recon">>,<<"2.5.6">>},0},
Expand All @@ -34,6 +35,7 @@
{<<"jsone">>, <<"347FF1FA700E182E1F9C5012FA6D737B12C854313B9AE6954CA75D3987D6C06D">>},
{<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>},
{<<"kafka_protocol">>, <<"F917B6C90C8DF0DE2B40A87D6B9AE1CFCE7788E91A65818E90E40CF76111097A">>},
{<<"opentelemetry_api">>, <<"63CA1742F92F00059298F478048DFB826F4B20D49534493D6919A0DB39B6DB04">>},
{<<"prometheus">>, <<"B95F8DE8530F541BD95951E18E355A840003672E5EDA4788C5FA6183406BA29A">>},
{<<"quantile_estimator">>, <<"EF50A361F11B5F26B5F16D0696E46A9E4661756492C981F7B2229EF42FF1CD15">>},
{<<"recon">>, <<"9052588E83BFEDFD9B72E1034532AEE2A5369D9D9343B61AEB7FBCE761010741">>}]},
Expand All @@ -43,6 +45,7 @@
{<<"jsone">>, <<"08560B78624A12E0B5E7EC0271EC8CA38EF51F63D84D84843473E14D9B12618C">>},
{<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>},
{<<"kafka_protocol">>, <<"DF680A3706EAD8695F8B306897C0A33E8063C690DA9308DB87B462CFD7029D04">>},
{<<"opentelemetry_api">>, <<"3DFBBFAA2C2ED3121C5C483162836C4F9027DEF469C41578AF5EF32589FCFC58">>},
{<<"prometheus">>, <<"719862351AABF4DF7079B05DC085D2BBCBE3AC0AC3009E956671B1D5AB88247D">>},
{<<"quantile_estimator">>, <<"282A8A323CA2A845C9E6F787D166348F776C1D4A41EDE63046D72D422E3DA946">>},
{<<"recon">>, <<"96C6799792D735CC0F0FD0F86267E9D351E63339CBE03DF9D162010CEFC26BB0">>}]}
Expand Down
102 changes: 65 additions & 37 deletions src/prg_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
-behaviour(gen_server).

-include("progressor.hrl").
-include("otel.hrl").

-export([start_link/2]).
-export([
Expand Down Expand Up @@ -31,19 +32,20 @@

-spec process_task(pid(), task_header(), task()) -> ok.
process_task(Worker, TaskHeader, #{process_id := _ProcessId, task_id := _TaskId} = Task) ->
gen_server:cast(Worker, {process_task, TaskHeader, Task}).
gen_server:cast(Worker, {process_task, TaskHeader, Task, ?current_otel_ctx}).

-spec continuation_task(pid(), task_header(), task()) -> ok.
continuation_task(Worker, TaskHeader, Task) ->
gen_server:cast(Worker, {continuation_task, TaskHeader, Task}).
gen_server:cast(Worker, {continuation_task, TaskHeader, Task, ?current_otel_ctx}).

-spec next_task(pid()) -> ok.
next_task(Worker) ->
_ = ?span_event(<<"next task">>),
gen_server:cast(Worker, next_task).

-spec process_scheduled_task(pid(), id(), task_id()) -> ok.
process_scheduled_task(Worker, ProcessId, TaskId) ->
gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId}).
gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId, ?current_otel_ctx}).

%%%===================================================================
%%% Spawning and gen_server implementation
Expand All @@ -61,6 +63,7 @@ init([NsId, NsOpts]) ->
{continue, do_start}}.

handle_continue(do_start, #prg_worker_state{ns_id = NsId} = State) ->
%% FIXME Worker w/o OTEL context, since it is not passed to init w/ `start_child`
{ok, Pid} = prg_worker_sidecar:start_link(),
case prg_scheduler:pop_task(NsId, self()) of
{TaskHeader, Task} ->
Expand All @@ -74,54 +77,76 @@ handle_call(_Request, _From, #prg_worker_state{} = State) ->
{reply, ok, State}.

handle_cast(
{process_task, TaskHeader, Task},
{process_task, TaskHeader, Task, OtelCtx},
#prg_worker_state{
ns_id = NsId,
ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts,
sidecar_pid = Pid
} = State
) ->
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
ProcessId = maps:get(process_id, Task),
HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}),
{ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange),
NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}),
{noreply, NewState};
?with_span(OtelCtx, <<"process task">>, fun() ->
ProcessId = maps:get(process_id, Task),
?span_attributes(#{
<<"progressor.process.type">> => atom_to_binary(element(1, TaskHeader)),
<<"progressor.process.id">> => ProcessId,
<<"progressor.process.namespace">> => NsId,
<<"progressor.process.task_id">> => maps:get(task_id, Task, undefined)
}),
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}),
{ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange),
NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}),
{noreply, NewState}
end);
handle_cast(
{continuation_task, TaskHeader, Task},
{continuation_task, TaskHeader, Task, OtelCtx},
#prg_worker_state{
ns_id = NsId,
ns_opts = #{process_step_timeout := TimeoutSec}
} = State
) ->
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
NewState = do_process_task(TaskHeader, Task, Deadline, State),
{noreply, NewState};
?with_span(OtelCtx, <<"process continuation">>, fun() ->
ProcessId = maps:get(process_id, Task),
?span_attributes(#{
<<"progressor.process.type">> => atom_to_binary(element(1, TaskHeader)),
<<"progressor.process.id">> => ProcessId,
<<"progressor.process.namespace">> => NsId,
<<"progressor.process.task_id">> => maps:get(task_id, Task, undefined)
}),
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
NewState = do_process_task(TaskHeader, Task, Deadline, State),
{noreply, NewState}
end);
handle_cast(
{process_scheduled_task, ProcessId, TaskId},
{process_scheduled_task, ProcessId, TaskId, OtelCtx},
#prg_worker_state{
ns_id = NsId,
ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts,
sidecar_pid = Pid
} = State
) ->
try prg_storage:capture_task(StorageOpts, NsId, TaskId) of
[] ->
%% task cancelled, blocked, already running or finished
ok = next_task(self()),
{noreply, State};
[#{status := <<"running">>} = Task] ->
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}),
{ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange),
TaskHeader = create_header(Task),
NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}),
{noreply, NewState}
catch
Class:Term:Stacktrace ->
logger:error("process ~p. task capturing exception: ~p", [ProcessId, [Class, Term, Stacktrace]]),
ok = next_task(self()),
{noreply, State}
end;
?with_span(OtelCtx, <<"process scheduled task">>, fun() ->
try prg_storage:capture_task(StorageOpts, NsId, TaskId) of
[] ->
%% task cancelled, blocked, already running or finished
ok = next_task(self()),
{noreply, State};
[#{status := <<"running">>} = Task] ->
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}),
{ok, Process} = prg_worker_sidecar:get_process(
Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange
),
TaskHeader = create_header(Task),
NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}),
{noreply, NewState}
catch
Class:Term:Stacktrace ->
logger:error("process ~p. task capturing exception: ~p", [ProcessId, [Class, Term, Stacktrace]]),
ok = next_task(self()),
{noreply, State}
end
end);
handle_cast(next_task, #prg_worker_state{sidecar_pid = CurrentPid}) ->
%% kill sidecar and restart to clear memory
true = erlang:unlink(CurrentPid),
Expand Down Expand Up @@ -153,10 +178,12 @@ do_process_task(
sidecar_pid = Pid
} = State
) ->
ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId),
ok = prg_worker_sidecar:remove_process(Pid, Deadline, StorageOpts, NsId, ProcessId),
ok = next_task(self()),
State#prg_worker_state{process = undefined};
?with_span(<<"remove process">>, fun() ->
ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId),
ok = prg_worker_sidecar:remove_process(Pid, Deadline, StorageOpts, NsId, ProcessId),
ok = next_task(self()),
State#prg_worker_state{process = undefined}
end);
do_process_task(
TaskHeader,
Task,
Expand Down Expand Up @@ -378,6 +405,7 @@ success_and_unlock(
last_retry_interval => 0,
attempts_count => 0
},
%% FIXME Otel must drop trace here - right before moving to other tasks
{ok, [ContinuationTask | _]} = prg_worker_sidecar:complete_and_continue(
Pid,
Deadline,
Expand Down
Loading
Loading