Skip to content

Commit fb52782

Browse files
ttt161ttt161WWWcool
authored
TD-814: add cluster module (#24)
* TD-814: add cluster module * TD-814: add logging * TD-814: fix * TD-814: fix style * TD-814: add tests * TD-814: bump github workflows * TD-814: fix spec * TD-814: add test * Update src/mg_core_union.erl Co-authored-by: Артем <WWW_cool@inbox.ru> * TD-814: fix issues * TD-814: full coverage --------- Co-authored-by: ttt161 <losto@nix> Co-authored-by: Артем <WWW_cool@inbox.ru>
1 parent 59e3de7 commit fb52782

4 files changed

Lines changed: 388 additions & 3 deletions

File tree

.github/workflows/erlang-checks.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
run:
2727
name: Run checks
2828
needs: setup
29-
uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.12
29+
uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.13
3030
with:
3131
otp-version: ${{ needs.setup.outputs.otp-version }}
3232
rebar-version: ${{ needs.setup.outputs.rebar-version }}

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ eunit:
8787
$(REBAR) eunit --cover
8888

8989
common-test:
90-
$(REBAR) ct --cover
90+
$(REBAR) ct --cover --name test_node@127.0.0.1
9191

9292
common-test.%: test/mg_core_%_SUITE.erl
93-
$(REBAR) ct --cover --suite=$^
93+
$(REBAR) ct --cover --suite=$^ --name test_node@127.0.0.1
9494

9595
cover:
9696
$(REBAR) covertool generate

src/mg_core_union.erl

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
-module(mg_core_union).
2+
3+
-behaviour(gen_server).
4+
5+
-export([start_link/1]).
6+
-export([
7+
init/1,
8+
handle_continue/2,
9+
handle_call/3,
10+
handle_cast/2,
11+
handle_info/2,
12+
terminate/2,
13+
code_change/3
14+
]).
15+
16+
-export([child_spec/1]).
17+
-export([discovery/1]).
18+
-export([cluster_size/0]).
19+
20+
-ifdef(TEST).
21+
-export([set_state/1]).
22+
-endif.
23+
24+
-define(SERVER, ?MODULE).
25+
-define(RECONNECT_TIMEOUT, 5000).
26+
27+
-type discovery_options() :: #{
28+
module := module(),
29+
%% options is module specific structure
30+
options := term()
31+
}.
32+
-type dns_discovery_options() :: #{
33+
%% #{<<"domain_name">> => <<"machinegun-ha-headless">>,<<"sname">> => <<"machinegun">>}
34+
binary() => binary()
35+
}.
36+
-type cluster_options() :: #{
37+
discovery => discovery_options(),
38+
reconnect_timeout => non_neg_integer()
39+
}.
40+
-type state() :: #{
41+
known_nodes => [node()],
42+
discovery => discovery_options(),
43+
reconnect_timeout => non_neg_integer()
44+
}.
45+
46+
%% discovery behaviour callback
47+
-callback discovery(dns_discovery_options()) -> {ok, [node()]}.
48+
49+
-spec child_spec(cluster_options()) -> [supervisor:child_spec()].
50+
child_spec(#{discovery := _} = ClusterOpts) ->
51+
[
52+
#{
53+
id => ?MODULE,
54+
start => {?MODULE, start_link, [ClusterOpts]}
55+
}
56+
];
57+
child_spec(_) ->
58+
% cluster not configured, skip
59+
[].
60+
61+
-spec discovery(dns_discovery_options()) -> {ok, [node()]}.
62+
discovery(#{<<"domain_name">> := DomainName, <<"sname">> := Sname}) ->
63+
case get_addrs(unicode:characters_to_list(DomainName)) of
64+
{ok, ListAddrs} ->
65+
logger:info("union. resolve ~p with result: ~p", [DomainName, ListAddrs]),
66+
{ok, addrs_to_nodes(lists:uniq(ListAddrs), Sname)};
67+
Error ->
68+
error({resolve_error, Error})
69+
end.
70+
71+
-ifdef(TEST).
72+
-spec set_state(state()) -> ok.
73+
set_state(NewState) ->
74+
gen_server:call(?MODULE, {set_state, NewState}).
75+
-endif.
76+
77+
-spec cluster_size() -> non_neg_integer().
78+
cluster_size() ->
79+
case whereis(?MODULE) of
80+
undefined ->
81+
%% for backward compatibility with consul
82+
ReplicaCount = os:getenv("REPLICA_COUNT", "1"),
83+
erlang:list_to_integer(ReplicaCount);
84+
Pid when is_pid(Pid) ->
85+
gen_server:call(Pid, get_cluster_size)
86+
end.
87+
88+
%%%===================================================================
89+
%%% Spawning and gen_server implementation
90+
%%%===================================================================
91+
-spec start_link(cluster_options()) -> {ok, pid()} | {error, term()}.
92+
start_link(ClusterOpts) ->
93+
gen_server:start_link({local, ?SERVER}, ?MODULE, ClusterOpts, []).
94+
95+
-spec init(cluster_options()) -> {ok, state(), {continue, {full_init, cluster_options()}}}.
96+
init(ClusterOpts) ->
97+
logger:info("union. init with options: ~p", [ClusterOpts]),
98+
{ok, #{}, {continue, {full_init, ClusterOpts}}}.
99+
100+
-spec handle_continue({full_init, cluster_options()}, state()) -> {noreply, state()}.
101+
handle_continue({full_init, #{discovery := #{module := Mod, options := Opts}} = ClusterOpts}, _State) ->
102+
_ = net_kernel:monitor_nodes(true),
103+
{ok, ListNodes} = Mod:discovery(Opts),
104+
_ = try_connect_all(ListNodes, maps:get(reconnect_timeout, ClusterOpts)),
105+
{noreply, ClusterOpts#{known_nodes => ListNodes}}.
106+
107+
-spec handle_call(term(), {pid(), _}, state()) -> {reply, any(), state()}.
108+
-ifdef(TEST).
109+
handle_call({set_state, NewState}, _From, _State) ->
110+
{reply, ok, NewState};
111+
handle_call(get_cluster_size, _From, #{known_nodes := ListNodes} = State) ->
112+
{reply, erlang:length(ListNodes), State}.
113+
-else.
114+
handle_call(get_cluster_size, _From, #{known_nodes := ListNodes} = State) ->
115+
{reply, erlang:length(ListNodes), State}.
116+
-endif.
117+
118+
-spec handle_cast(term(), state()) -> {noreply, state()}.
119+
handle_cast(_Request, State) ->
120+
{noreply, State}.
121+
122+
-spec handle_info(term(), state()) -> {noreply, state()}.
123+
handle_info({timeout, _TRef, {reconnect, Node}}, State) ->
124+
ListNodes = maybe_connect(Node, State),
125+
{noreply, State#{known_nodes => ListNodes}};
126+
handle_info({nodeup, RemoteNode}, #{known_nodes := ListNodes} = State) ->
127+
logger:info("union. ~p receive nodeup ~p", [node(), RemoteNode]),
128+
NewState =
129+
case lists:member(RemoteNode, ListNodes) of
130+
true ->
131+
%% well known node connected, do nothing
132+
State;
133+
false ->
134+
%% new node connected, need update list nodes
135+
#{discovery := #{module := Mod, options := Opts}, reconnect_timeout := Timeout} = State,
136+
{ok, NewListNodes} = Mod:discovery(Opts),
137+
_ = try_connect_all(NewListNodes, Timeout),
138+
State#{known_nodes => NewListNodes}
139+
end,
140+
{noreply, NewState};
141+
handle_info({nodedown, RemoteNode}, #{reconnect_timeout := Timeout} = State) ->
142+
logger:warning("union. ~p receive nodedown ~p", [node(), RemoteNode]),
143+
_ = erlang:start_timer(Timeout, self(), {reconnect, RemoteNode}),
144+
{noreply, State}.
145+
146+
-spec terminate(_Reason, state()) -> ok.
147+
terminate(_Reason, _State) ->
148+
ok.
149+
150+
-spec code_change(_OldVsn, state(), _Extra) -> {ok, state()}.
151+
code_change(_OldVsn, State, _Extra) ->
152+
{ok, State}.
153+
154+
%%%===================================================================
155+
%%% Internal functions
156+
%%%===================================================================
157+
158+
%% cluster functions
159+
-spec connect(node(), non_neg_integer()) -> ok | error.
160+
connect(Node, ReconnectTimeout) ->
161+
case net_adm:ping(Node) of
162+
pong ->
163+
ok;
164+
_ ->
165+
_ = erlang:start_timer(ReconnectTimeout, self(), {reconnect, Node}),
166+
error
167+
end.
168+
169+
-spec try_connect_all([node()], non_neg_integer()) -> ok.
170+
try_connect_all(ListNodes, ReconnectTimeout) ->
171+
_ = lists:foreach(fun(Node) -> connect(Node, ReconnectTimeout) end, ListNodes).
172+
173+
-spec maybe_connect(node(), state()) -> [node()].
174+
maybe_connect(Node, #{discovery := #{module := Mod, options := Opts}, reconnect_timeout := Timeout}) ->
175+
{ok, ListNodes} = Mod:discovery(Opts),
176+
case lists:member(Node, ListNodes) of
177+
false ->
178+
%% node deleted from cluster, do nothing
179+
skip;
180+
true ->
181+
connect(Node, Timeout)
182+
end,
183+
ListNodes.
184+
185+
%% discovery functions
186+
-spec get_addrs(inet:hostname()) -> {ok, [inet:ip_address()]} | {error, _}.
187+
get_addrs(DomainName) ->
188+
case inet:getaddrs(DomainName, inet) of
189+
{ok, _} = Ok -> Ok;
190+
_ -> inet:getaddrs(DomainName, inet6)
191+
end.
192+
193+
-spec addrs_to_nodes([inet:ip_address()], binary()) -> [node()].
194+
addrs_to_nodes(ListAddrs, Sname) ->
195+
NodeName = unicode:characters_to_list(Sname),
196+
lists:foldl(
197+
fun(Addr, Acc) ->
198+
[erlang:list_to_atom(NodeName ++ "@" ++ inet:ntoa(Addr)) | Acc]
199+
end,
200+
[],
201+
ListAddrs
202+
).
203+
204+
-ifdef(TEST).
205+
-include_lib("eunit/include/eunit.hrl").
206+
207+
-define(CLUSTER_OPTS, #{
208+
discovery => #{
209+
module => mg_core_union,
210+
options => #{
211+
<<"domain_name">> => <<"localhost">>,
212+
<<"sname">> => <<"test_node">>
213+
}
214+
},
215+
reconnect_timeout => ?RECONNECT_TIMEOUT
216+
}).
217+
218+
-spec test() -> _.
219+
220+
-spec connect_error_test() -> _.
221+
connect_error_test() ->
222+
?assertEqual(error, connect('foo@127.0.0.1', 3000)).
223+
224+
-spec child_spec_test() -> _.
225+
child_spec_test() ->
226+
EmptyChildSpec = mg_core_union:child_spec(#{}),
227+
?assertEqual([], EmptyChildSpec),
228+
ExpectedSpec = [
229+
#{
230+
id => mg_core_union,
231+
start => {
232+
mg_core_union,
233+
start_link,
234+
[?CLUSTER_OPTS]
235+
}
236+
}
237+
],
238+
ChildSpec = mg_core_union:child_spec(?CLUSTER_OPTS),
239+
?assertEqual(ExpectedSpec, ChildSpec).
240+
241+
-spec nxdomain_test() -> _.
242+
nxdomain_test() ->
243+
?assertError(
244+
{resolve_error, {error, nxdomain}},
245+
mg_core_union:discovery(#{<<"domain_name">> => <<"bad_name">>, <<"sname">> => <<"mg">>})
246+
).
247+
248+
-spec for_full_cover_test() -> _.
249+
for_full_cover_test() ->
250+
?assertEqual({noreply, #{}}, handle_cast([], #{})),
251+
?assertEqual(ok, terminate(term, #{})),
252+
?assertEqual({ok, #{}}, code_change(old, #{}, extra)).
253+
254+
-endif.

0 commit comments

Comments
 (0)