Skip to content

Commit 7fbde7e

Browse files
ttt161ttt161nanodirijabl
authored
Epic/update prod profile (#30)
* update prod profile * debug canal * bump progressor * update configurator * fix aux_state marshaling * fix aux_data marshaling again * fix formatting * fix msgpack marshaling * fix msgpack marshaling * bump progressor, update configurator.escript * fix yaml format * bump progressor-1.0.15 --------- Co-authored-by: ttt161 <losto@nix> Co-authored-by: Aleksey Kashapov <nanodirijabl@gmail.com>
1 parent 198a884 commit 7fbde7e

5 files changed

Lines changed: 147 additions & 26 deletions

File tree

apps/mg_progressor/src/mg_progressor.erl

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ marshal(process, Process) ->
133133
history = maybe_marshal(history, maps:get(history, Process)),
134134
history_range = marshal(history_range, maps:get(history_range, Process)),
135135
status = marshal(status, {maps:get(status, Process), maps:get(detail, Process, undefined)}),
136-
aux_state = maybe_marshal(term, maps:get(aux_state, Process, undefined))
136+
aux_state = to_content(maybe_marshal(term, maps:get(aux_state, Process, undefined)))
137137
};
138138
marshal(history, History) ->
139139
lists:map(fun(Ev) -> marshal(event, Ev) end, History);
@@ -169,3 +169,52 @@ format_version(#{<<"format_version">> := Version}) ->
169169
Version;
170170
format_version(_) ->
171171
undefined.
172+
173+
to_content(undefined) ->
174+
undefined;
175+
to_content(#mg_stateproc_Content{} = Content) ->
176+
Content;
177+
to_content({T, _V} = MsgPackValue) when
178+
T =:= nl;
179+
T =:= b;
180+
T =:= i;
181+
T =:= flt;
182+
T =:= str;
183+
T =:= bin;
184+
T =:= arr;
185+
T =:= obj
186+
->
187+
#mg_stateproc_Content{data = MsgPackValue};
188+
to_content(Data) ->
189+
#mg_stateproc_Content{data = to_msgpack(Data)}.
190+
191+
to_msgpack(undefined) ->
192+
{nl, #mg_msgpack_Nil{}};
193+
to_msgpack(Binary) when is_binary(Binary) ->
194+
{bin, Binary};
195+
to_msgpack(Boolean) when is_boolean(Boolean) ->
196+
{b, Boolean};
197+
to_msgpack(Integer) when is_integer(Integer) ->
198+
{i, Integer};
199+
to_msgpack(Float) when is_float(Float) ->
200+
{flt, Float};
201+
to_msgpack(Array) when is_list(Array) ->
202+
{arr, lists:map(fun to_msgpack/1, Array)};
203+
to_msgpack(Object) when is_map(Object) ->
204+
try
205+
maps:fold(
206+
fun(K, V, Acc) ->
207+
maps:put(to_msgpack(K), to_msgpack(V), Acc)
208+
end,
209+
#{},
210+
Object
211+
)
212+
of
213+
Data ->
214+
{obj, Data}
215+
catch
216+
_:_ ->
217+
{bin, erlang:term_to_binary(Object)}
218+
end;
219+
to_msgpack(Term) ->
220+
{bin, erlang:term_to_binary(Term)}.

config/config.yaml

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,25 @@ postgres:
354354
## Progressor namespaces settings
355355

356356
progressor:
357-
prg_test_ns: some_processor_pool1
357+
prg_test_ns_2:
358+
# required
359+
storage:
360+
# optional
361+
client: prg_pg_backend
362+
# required
363+
options:
364+
# required
365+
pool: another_processor_pool
366+
# optional
367+
notifier:
368+
# required
369+
client: default_kafka_client
370+
# required
371+
options:
372+
# required
373+
topic: eventsink_topic
374+
# required
375+
lifecycle_topic: lifecycle_topic
358376

359377
# Optional section
360378
# if not defined then canal will not be started

rebar.config

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151

5252
{deps, [
5353
{genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}},
54-
{progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.1"}}},
54+
{progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.15"}}},
5555
% for configurator script
5656
{yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}},
5757
{cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}}
@@ -76,6 +76,7 @@
7676
{tools, load},
7777
% log formatter
7878
{logger_logstash_formatter, load},
79+
{canal, load},
7980
% main app
8081
{machinegun, permanent}
8182
]},

rebar.lock

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
{<<"cache">>,{pkg,<<"cache">>,<<"2.3.3">>},1},
66
{<<"canal">>,
77
{git,"https://github.com/valitydev/canal",
8-
{ref,"621d3821cd0a6036fee75d8e3b2d17167f3268e4"}},
8+
{ref,"89faedce3b054bcca7cc31ca64d2ead8a9402305"}},
99
2},
1010
{<<"certifi">>,{pkg,<<"certifi">>,<<"2.8.0">>},2},
1111
{<<"cg_mon">>,
@@ -19,11 +19,11 @@
1919
{<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2},
2020
{<<"epg_connector">>,
2121
{git,"https://github.com/valitydev/epg_connector.git",
22-
{ref,"dd93e27c00d492169e8a7bfc38976b911c6e7d05"}},
22+
{ref,"af35200fa1c63e7afeaa90cad862944c194b2686"}},
2323
1},
2424
{<<"epgsql">>,
2525
{git,"https://github.com/epgsql/epgsql.git",
26-
{ref,"7ba52768cf0ea7d084df24d4275a88eef4db13c2"}},
26+
{ref,"28e9f84c95065a51e92baeb37d2cf1687fc4b9ce"}},
2727
2},
2828
{<<"erl_health">>,
2929
{git,"https://github.com/valitydev/erlang-health",
@@ -36,10 +36,6 @@
3636
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0},
3737
{<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},1},
3838
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},1},
39-
{<<"hamcrest">>,
40-
{git,"https://github.com/basho/hamcrest-erlang.git",
41-
{ref,"ad3dbab419762fc2d5821abb88b989da006b85c6"}},
42-
2},
4339
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3},
4440
{<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2},
4541
{<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3},
@@ -67,7 +63,7 @@
6763
0},
6864
{<<"progressor">>,
6965
{git,"https://github.com/valitydev/progressor.git",
70-
{ref,"6df2e447a867434ad45bfc3540c4681e10105e02"}},
66+
{ref,"fdebffd0db07208faa452e0151491769949a5076"}},
7167
0},
7268
{<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0},
7369
{<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0},

rel_scripts/configurator.escript

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -628,39 +628,96 @@ pg_pool_opts(PoolOpts) ->
628628

629629
progressor(YamlConfig) ->
630630
PrgNamespaces = lists:foldl(
631-
fun({NsName, NsPgPool}, Acc) ->
632-
Acc#{?C:atom(NsName) => prg_namespace(?C:atom(NsPgPool))}
631+
fun({NsName, NsOpts}, Acc) ->
632+
Acc#{?C:atom(NsName) => prg_namespace(NsOpts)}
633633
end,
634634
#{},
635635
?C:conf([progressor], YamlConfig, [])
636636
),
637-
[{namespaces, PrgNamespaces}].
637+
[{namespaces, PrgNamespaces}, {migration_enabled, false}].
638638

639-
prg_namespace(NsPgPool) ->
640-
#{
641-
storage => #{
642-
client => prg_pg_backend,
643-
options => #{pool => NsPgPool}
644-
},
645-
processor => #{
646-
%% Never will be called
647-
client => null
648-
},
639+
prg_namespace(NsOptsList) ->
640+
InitAcc = #{
641+
processor => #{client => null},
649642
worker_pool_size => 0
643+
},
644+
lists:foldl(
645+
fun
646+
({<<"storage">>, StorageOpts}, Acc) -> Acc#{storage => prg_storage_opts(StorageOpts)};
647+
({<<"notifier">>, NotifierOpts}, Acc) -> Acc#{notifier => prg_notifier_opts(NotifierOpts)};
648+
(_, Acc) -> Acc
649+
end,
650+
InitAcc,
651+
NsOptsList
652+
).
653+
654+
prg_storage_opts(StorageOpts) ->
655+
Client = ?C:atom(?C:conf([client], StorageOpts, <<"prg_pg_backend">>)),
656+
OptsList = ?C:conf([options], StorageOpts),
657+
#{
658+
client => Client,
659+
options => prg_storage_handler_opts(Client, OptsList)
660+
}.
661+
662+
prg_storage_handler_opts(prg_pg_backend, OptsList) ->
663+
#{pool => ?C:atom(?C:conf([pool], OptsList))}.
664+
665+
prg_notifier_opts(NotifierOpts) ->
666+
Client = ?C:atom(?C:conf([client], NotifierOpts)),
667+
OptsList = ?C:conf([options], NotifierOpts),
668+
#{
669+
client => Client,
670+
options => #{
671+
topic => ?C:conf([topic], OptsList),
672+
lifecycle_topic => ?C:conf([lifecycle_topic], OptsList)
673+
}
650674
}.
651675

652676
canal(YamlConfig) ->
677+
Default = [
678+
{httpc_options, [{ssl, [{verify, verify_none}]}]},
679+
{kvv2_secret_mount_path, "/secret/data/"}
680+
],
653681
lists:foldl(
654682
fun
655683
({<<"url">>, Url}, Acc) ->
656684
[{url, unicode:characters_to_list(Url)} | Acc];
657685
({<<"engine">>, Value}, Acc) ->
658-
[{engine, ?C:atom(Value)} | Acc]
686+
[{engine, ?C:atom(Value)} | Acc];
687+
({<<"httpc_options">>, Value}, Acc) ->
688+
[{httpc_options, canal_httpc_options(Value)} | Acc];
689+
({<<"kvv2_secret_mount_path">>, Value}, Acc) ->
690+
[{kvv2_secret_mount_path, unicode:characters_to_list(Value)} | Acc]
659691
end,
660-
[],
692+
Default,
661693
?C:conf([canal], YamlConfig, [])
662694
).
663695

696+
canal_httpc_options(Value) ->
697+
lists:foldl(
698+
fun
699+
({<<"ssl">>, SslOpts}, Acc) ->
700+
[{ssl, ssl_opts(SslOpts)} | Acc];
701+
(_, Acc) ->
702+
Acc
703+
end,
704+
[],
705+
Value
706+
).
707+
708+
ssl_opts(Opts) ->
709+
Default = [{ssl, [{verify, verify_none}]}],
710+
lists:foldl(
711+
fun
712+
({<<"verify">>, Value}, Acc) ->
713+
[{verify, ?C:atom(Value)} | Acc];
714+
(_, Acc) ->
715+
Acc
716+
end,
717+
Default,
718+
Opts
719+
).
720+
664721
%%
665722
%% vm.args
666723
%%

0 commit comments

Comments
 (0)