Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/syn.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,23 @@
ApiCall :: atom(),
Version :: atom()
}.
-type syn_registry_entry() :: {
%%-type syn_registry_entry() :: {
%% Name :: term(),
%% Pid :: pid(),
%% Meta :: term(),
%% Time :: integer(),
%% MRef :: undefined | reference(),
%% Node :: node()
%%}.
-type syn_registry_entry() :: { %%Removed duplicated meta information.
Name :: term(),
Pid :: pid(),
Meta :: term(),
Time :: integer(),
MRef :: undefined | reference(),
Node :: node()
}.
-type syn_registry_entry_by_pid() :: {

-type syn_registry_entry_by_pid() :: { %%kept only here (First element of tuple becomes a key in ETS)
Pid :: pid(),
Name :: term(),
Meta :: term(),
Expand Down
126 changes: 67 additions & 59 deletions src/syn_registry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,15 @@ subcluster_nodes(Scope) ->

-spec lookup(Scope :: atom(), Name :: term()) -> {pid(), Meta :: term()} | undefined.
lookup(Scope, Name) ->
case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
undefined ->
error({invalid_scope, Scope});

TableByName ->
case find_registry_entry_by_name(Name, TableByName) of
TableByName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
TableByPid = syn_backbone:get_table_name(syn_registry_by_pid, Scope),
case {TableByName, TableByPid} of
{undefined, _} -> error({invalid_scope, Scope});
{_, undefined} -> error({invalid_scope, Scope});
_ ->
case find_registry_entry_by_name(Name, TableByName, TableByPid) of
undefined -> undefined;
{Name, Pid, Meta, _, _, Node} ->
% This read can be initiated prior to registration, by a
% supervisor or supervisor-like process trying to restart a
% stopped process in response to a 'DOWN', while the 'DOWN'
% handler in this module is yet to update TableByName.
%
% Verifying aliveness avoids confusing already_started
% errors while restarting registered processes.
% The aliveness check is only necessary when the Pid is
% local.
case Node =:= node() andalso not is_process_alive(Pid) of
true -> undefined;
false -> {Pid, Meta}
Expand All @@ -107,16 +99,14 @@ register(Scope, Name, Pid, Meta) ->
-spec update(Scope :: atom(), Name :: term(), Fun :: function()) ->
{ok, {Pid :: pid(), Meta :: term()}} | {error, Reason :: term()}.
update(Scope, Name, Fun) ->
case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
undefined ->
error({invalid_scope, Scope});

TableByName ->
% get process' node
case find_registry_entry_by_name(Name, TableByName) of
undefined ->
{error, undefined};

TableByName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
TableByPid = syn_backbone:get_table_name(syn_registry_by_pid, Scope),
case {TableByName, TableByPid} of
{undefined, _} -> error({invalid_scope, Scope});
{_, undefined} -> error({invalid_scope, Scope});
_ ->
case find_registry_entry_by_name(Name, TableByName, TableByPid) of
undefined -> {error, undefined};
{Name, Pid, _, _, _, _} ->
register_or_update(Scope, Name, Pid, Fun)
end
Expand Down Expand Up @@ -155,22 +145,24 @@ register_or_update(Scope, Name, Pid, MetaOrFun) ->
end
end.

-spec unregister(Scope :: atom(), Name :: term()) -> ok | {error, Reason :: term()}.
-spec unregister(Scope :: atom(), Name :: term()) -> ok | {error, Reason :: term()}.
unregister(Scope, Name) ->
case syn_backbone:get_table_name(syn_registry_by_name, Scope) of
undefined ->
error({invalid_scope, Scope});

TableByName ->
TableByName = syn_backbone:get_table_name(syn_registry_by_name, Scope),
TableByPid = syn_backbone:get_table_name(syn_registry_by_pid, Scope),

case {TableByName, TableByPid} of
{undefined, _} -> error({invalid_scope, Scope});
{_, undefined} -> error({invalid_scope, Scope});
_ ->
% get process' node
case find_registry_entry_by_name(Name, TableByName) of
case find_registry_entry_by_name(Name, TableByName, TableByPid) of
undefined ->
{error, undefined};

{Name, Pid, Meta, _, _, _} ->
Node = node(Pid),
case syn_gen_scope:call(?MODULE, Node, Scope, {'3.0', unregister_on_node, node(), Name, Pid}) of
{ok, TableByPid} when Node =/= node() ->
{ok, _RemoteTableByPid} when Node =/= node() ->
%% remove table on caller node immediately so that subsequent calls have an updated registry
remove_from_local_table(Name, Pid, TableByName, TableByPid),
%% callback
Expand Down Expand Up @@ -247,7 +239,7 @@ handle_call({'3.0', register_or_update_on_node, RequesterNode, Name, Pid, MetaOr
} = State) ->
case is_process_alive(Pid) of
true ->
case find_registry_entry_by_name(Name, TableByName) of
case find_registry_entry_by_name(Name, TableByName, TableByPid) of
undefined when is_function(MetaOrFun) ->
{reply, {error, undefined}, State};

Expand Down Expand Up @@ -297,7 +289,7 @@ handle_call({'3.0', unregister_on_node, RequesterNode, Name, Pid}, _From, #state
table_by_name = TableByName,
table_by_pid = TableByPid
} = State) ->
case find_registry_entry_by_name(Name, TableByName) of
case find_registry_entry_by_name(Name, TableByName, TableByPid) of
undefined ->
{reply, {error, undefined}, State};

Expand Down Expand Up @@ -346,7 +338,7 @@ handle_info({'3.0', sync_unregister, Name, Pid, Meta, Reason}, #state{
table_by_name = TableByName,
table_by_pid = TableByPid
} = State) ->
case find_registry_entry_by_name(Name, TableByName) of
case find_registry_entry_by_name(Name, TableByName, TableByPid) of
{_, Pid, _, _, _, _} ->
remove_from_local_table(Name, Pid, TableByName, TableByPid),
%% callback
Expand Down Expand Up @@ -391,8 +383,11 @@ handle_info(Info, #state{scope = Scope} = State) ->
%% Data callbacks
%% ----------------------------------------------------------------------------------------------------------
-spec get_local_data(#state{}) -> {ok, Data :: term()} | undefined.
get_local_data(#state{table_by_name = TableByName}) ->
{ok, get_registry_tuples_for_node(node(), TableByName)}.
get_local_data(#state{
table_by_name = TableByName,
table_by_pid = TableByPid
}) ->
{ok, get_registry_tuples_for_node(node(), TableByName, TableByPid)}.

-spec save_remote_data(RemoteNode :: node(), RemoteData :: term(), #state{}) -> any().
save_remote_data(RemoteNode, RegistryTuplesOfRemoteNode, #state{scope = Scope} = State) ->
Expand All @@ -415,7 +410,7 @@ purge_local_data_for_node(Node, #state{
%% ===================================================================
-spec rebuild_monitors(Scope :: atom(), TableByName :: atom(), TableByPid :: atom()) -> ok.
rebuild_monitors(Scope, TableByName, TableByPid) ->
RegistryTuples = get_registry_tuples_for_node(node(), TableByName),
RegistryTuples = get_registry_tuples_for_node(node(), TableByName, TableByPid),
do_rebuild_monitors(RegistryTuples, #{}, Scope, TableByName, TableByPid).

-spec do_rebuild_monitors(
Expand Down Expand Up @@ -488,21 +483,32 @@ do_register_on_node(Name, Pid, PreviousMeta, Meta, MRef, Reason, RequesterNode,
%% return
{reply, {ok, {CallbackMethod, PreviousMeta, Meta, Time, TableByName, TableByPid}}, State}.

-spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom()) -> [syn_registry_tuple()].
get_registry_tuples_for_node(Node, TableByName) ->
ets:select(TableByName, [{
{'$1', '$2', '$3', '$4', '_', Node},
-spec get_registry_tuples_for_node(Node :: node(), TableByName :: atom(), TableByPid :: atom()) -> [syn_registry_tuple()].
get_registry_tuples_for_node(Node, TableByName, TableByPid) ->
Entries = ets:select(TableByName, [{
{'$1', '$2', '$3', '_', Node},
[],
[{{'$1', '$2', '$3', '$4'}}]
}]).

-spec find_registry_entry_by_name(Name :: term(), TableByName :: atom()) ->
Entry :: syn_registry_entry() | undefined.
find_registry_entry_by_name(Name, TableByName) ->
case ets:lookup(TableByName, Name) of
[] -> undefined;
[Entry] -> Entry
end.
[{{'$1', '$2', '$3'}}]
}]),
lists:filtermap(fun({Name, Pid, Time}) ->
case ets:lookup(TableByPid, Pid) of
[{Pid, Name, Meta, _, _, _}] -> {true, {Name, Pid, Meta, Time}};
[] -> false
end
end, Entries).
-spec find_registry_entry_by_name(Name :: term(), TableByName :: atom(), TableByPid :: atom()) ->
Entry :: tuple() | undefined.
find_registry_entry_by_name(Name, TableByName, TableByPid) ->
case ets:lookup(TableByName, Name) of
[] -> undefined;
[{Name, Pid, Time, MRef, Node}] ->
case ets:lookup(TableByPid, Pid) of
[{Pid, Name, Meta, _Time, _MRef, _Node}] ->
{Name, Pid, Meta, Time, MRef, Node};
[] ->
undefined
end
end.

-spec find_registry_entries_by_pid(Pid :: pid(), TableByPid :: atom()) -> RegistryEntriesByPid :: [syn_registry_entry_by_pid()].
find_registry_entries_by_pid(Pid, TableByPid) when is_pid(Pid) ->
Expand Down Expand Up @@ -551,7 +557,9 @@ maybe_demonitor(Pid, TableByPid) ->
) -> true.
add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
%% insert
true = ets:insert(TableByName, {Name, Pid, Meta, Time, MRef, node(Pid)}),
%% Remove meta inserion
%%true = ets:insert(TableByName, {Name, Pid, Meta, Time, MRef, node(Pid)}),
true = ets:insert(TableByName, {Name, Pid, Time, MRef, node(Pid)}),
%% since we use a table of type bag, we need to manually ensure that the key Pid, Name is unique
true = ets:match_delete(TableByPid, {Pid, Name, '_', '_', '_', '_'}),
true = ets:insert(TableByPid, {Pid, Name, Meta, Time, MRef, node(Pid)}).
Expand All @@ -563,7 +571,7 @@ add_to_local_table(Name, Pid, Meta, Time, MRef, TableByName, TableByPid) ->
TableByPid :: atom()
) -> true.
remove_from_local_table(Name, Pid, TableByName, TableByPid) ->
true = ets:match_delete(TableByName, {Name, Pid, '_', '_', '_', '_'}),
true = ets:match_delete(TableByName, {Name, Pid, '_', '_', '_'}),
true = ets:match_delete(TableByPid, {Pid, Name, '_', '_', '_', '_'}).

-spec update_local_table(
Expand Down Expand Up @@ -599,14 +607,14 @@ purge_registry_for_remote_nodes(Scope, TableByName, TableByPid) ->
-spec purge_registry_for_remote_node(Scope :: atom(), Node :: atom(), TableByName :: atom(), TableByPid :: atom()) -> true.
purge_registry_for_remote_node(Scope, Node, TableByName, TableByPid) when Node =/= node() ->
%% loop elements for callback
RegistryTuples = get_registry_tuples_for_node(Node, TableByName),
RegistryTuples = get_registry_tuples_for_node(node(), TableByName, TableByPid),
lists:foreach(fun({Name, Pid, Meta, _Time}) ->
syn_event_handler:call_event_handler(on_process_unregistered,
[Scope, Name, Pid, Meta, {syn_remote_scope_node_down, Scope, Node}]
)
end, RegistryTuples),
%% remove all from pid table
true = ets:match_delete(TableByName, {'_', '_', '_', '_', '_', Node}),
true = ets:match_delete(TableByName, {'_', '_', '_', '_', Node}),
true = ets:match_delete(TableByPid, {'_', '_', '_', '_', '_', Node}).

-spec reconcile_remote_registry_snapshot(Node :: node(), [syn_registry_tuple()], #state{}) -> ok.
Expand All @@ -616,7 +624,7 @@ reconcile_remote_registry_snapshot(Node, RegistryTuplesOfRemoteNode, #state{
table_by_pid = TableByPid
}) ->
SnapshotNames = ordsets:from_list([Name || {Name, _Pid, _Meta, _Time} <- RegistryTuplesOfRemoteNode]),
ExistingTuples = get_registry_tuples_for_node(Node, TableByName),
ExistingTuples = get_registry_tuples_for_node(node(), TableByName, TableByPid),
lists:foreach(fun({Name, Pid, Meta, _Time}) ->
case ordsets:is_element(Name, SnapshotNames) of
true ->
Expand All @@ -643,7 +651,7 @@ handle_registry_sync(Name, Pid, Meta, Time, Reason, #state{
table_by_name = TableByName,
table_by_pid = TableByPid
} = State) ->
case find_registry_entry_by_name(Name, TableByName) of
case find_registry_entry_by_name(Name, TableByName, TableByPid) of
undefined ->
%% no conflict
add_to_local_table(Name, Pid, Meta, Time, undefined, TableByName, TableByPid),
Expand Down