Skip to content

Commit 555216f

Browse files
committed
fixes for discovery and tpicapi, test are passed
1 parent df848be commit 555216f

6 files changed

Lines changed: 218 additions & 76 deletions

File tree

apps/tpic2/src/tpic2_client.erl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,25 @@ connection_process(Parent, Host, Port, Opts) ->
7474
{ok, Socket} = ssl:connect(TCPSocket, SSLOpts),
7575
ssl:setopts(Socket, [{active, once}]),
7676
{ok,PeerInfo}=ssl:connection_information(Socket),
77+
PeerPK=case ssl:peercert(Socket) of
78+
{ok, PC} ->
79+
DCert=tpic2:extract_cert_info(public_key:pkix_decode_cert(PC,otp)),
80+
case DCert of
81+
#{pubkey:=Der} ->
82+
Der;
83+
_ ->
84+
?LOG_NOTICE("Unknown cert ~p",[DCert]),
85+
undefined
86+
end;
87+
{error, no_peercert} ->
88+
undefined
89+
end,
90+
7791
State=#{
7892
ref=>maps:get(ref, Opts, undefined),
7993
socket=>Socket,
8094
peerinfo=>PeerInfo,
95+
pubkey=>PeerPK,
8196
timer=>undefined,
8297
transport=>ranch_ssl,
8398
parent=>Parent,

apps/tpic2/src/tpic2_tls.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ conn_proto(Parent, Ref, Socket, Transport, Opts, <<"tpic2">>, PeerPK) ->
8181
timer=>undefined,
8282
transport=>Transport,
8383
protocol => tpic2,
84-
peerpk => PeerPK,
84+
pubkey => PeerPK,
8585
nodeid=> try
8686
nodekey:get_pub()
8787
catch _:_ -> atom_to_binary(node(),utf8)
@@ -96,7 +96,7 @@ conn_proto(_Parent, _Ref, Socket, Transport, _Opts, _, _) ->
9696
Transport:close(Socket).
9797

9898
loop1(State=#{socket:=Socket,role:=Role,opts:=Opts,
99-
transport:=Transport,peerpk:=Pubkey}) ->
99+
transport:=Transport,pubkey:=Pubkey}) ->
100100
%{ok,PC}=ssl:peercert(Socket),
101101
%DCert=tpic2:extract_cert_info(public_key:pkix_decode_cert(PC,otp)),
102102
%Pubkey=case DCert of
@@ -166,7 +166,7 @@ loop1(State=#{socket:=Socket,role:=Role,opts:=Opts,
166166
if WhatToDo==shutdown ->
167167
done;
168168
true ->
169-
?MODULE:loop(State#{pubkey=>Pubkey})
169+
?MODULE:loop(State)
170170
end
171171
end.
172172

@@ -318,7 +318,9 @@ handle_msg(#{null:=<<"hello">>,
318318
OldSID
319319
}
320320
end,
321-
{ok, PPID}=gen_server:call(tpic2_cmgr, {peer,PK, Reg}),
321+
{_OkOrExists,PPID}=gen_server:call(tpic2_cmgr, {peer,PK, Reg}),
322+
%logger:notice("RegRes ~p ~p: ~p",[PK, Reg, RegRes]),
323+
%{ok, PPID}=RegRes,
322324
lists:foreach(fun(Addr) ->
323325
gen_server:call(PPID, {add, binary_to_list(Addr), Port})
324326
end,

apps/tpnode/src/discovery.erl

Lines changed: 95 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,16 @@ handle_call({get_pid, Name}, _From, #{local_services:=Dict} = State) when is_bin
165165
end,
166166
{reply, Reply, State};
167167

168-
handle_call({lookup, Pred}, _From, State) when is_function(Pred) ->
169-
{reply, query(Pred, State), State};
170-
171168
%% get list of ip and port for service with name Name (local and remote)
172169
handle_call({lookup, Name}, _From, State) ->
173170
{reply, query(Name, State), State};
174171

175172
handle_call({lookup, Name, Chain}, _From, State) ->
176173
{reply, query(Name, Chain, State), State};
177174

175+
handle_call({lookup_raw, Name, Chain}, _From, State) ->
176+
{reply, query_raw(Name, Chain, State), State};
177+
178178
handle_call({lookup_remote, Name}, _From, #{remote_services := RemoteDict} = State) ->
179179
{reply, query_remote(Name, RemoteDict, blockchain:chain()), State};
180180

@@ -331,7 +331,7 @@ get_local_addresses(State) ->
331331

332332

333333
% --------------------------------------------------------
334-
announce_one_service(Name, TranslatedAddress, Ttl, Scopes) ->
334+
prepare_announce_one_service(Name, TranslatedAddress, Ttl, Scopes) ->
335335
try
336336
%% TranslatedAddress = add_hostname(translate_address(Address), Hostname),
337337

@@ -350,16 +350,26 @@ announce_one_service(Name, TranslatedAddress, Ttl, Scopes) ->
350350
scopes => Scopes,
351351
chain => blockchain:chain()
352352
},
353-
AnnounceBin = pack(Announce),
354-
send_service_announce(AnnounceBin)
353+
{ok, pack(Announce)}
355354
catch
356355
Err:Reason ->
357356
?LOG_ERROR(
358357
"Announce with name ~p and address ~p and scopes ~p hasn't made because ~p ~p",
359358
[Name, TranslatedAddress, Scopes, Err, Reason]
360-
)
359+
),
360+
error
361361
end.
362362

363+
%announce_one_service(Name, TranslatedAddress, Ttl, Scopes) ->
364+
% case prepare_announce_one_service(Name, TranslatedAddress, Ttl, Scopes) of
365+
% {ok, AnnounceBin} ->
366+
% send_service_announce(AnnounceBin),
367+
% ok;
368+
% error ->
369+
% error
370+
% end.
371+
372+
363373
% ------------------------------------------------------------
364374
-spec is_local_service(Announce :: #{ 'nodeid' := _, _ := _ }) -> boolean().
365375

@@ -418,49 +428,60 @@ get_local_names(Names) ->
418428
% --------------------------------------------------------
419429

420430
% make announce of our local services with tpic scope
421-
make_announce(#{names:=Names} = _Dict, State) ->
422-
?LOG_DEBUG("Announcing our local services"),
431+
prepare_announce(#{names:=Names} = _Dict, State) ->
423432
Ttl = max(get_config(intrachain_ttl, 300, State), 30),
424433
Hostname = application:get_env(tpnode, hostname, unknown),
425-
%% ValidUntil = os:system_time(seconds) + get_config(intrachain_ttl, 120, State),
426434
Addresses = get_config(addresses, get_default_addresses(), State),
427435
AllScopesCfg = get_config(scope, ?DEFAULT_SCOPE_CONFIG, State),
428436
MacroDict = get_config(macro_dict, #{}, State),
429437
LocalNames = get_local_names(Names),
430-
Announcer = fun(Name, Counter) ->
431-
Counter + lists:foldl(
432-
% #{address => local4, port => 53221, proto => tpic}
433-
fun(#{proto := Proto} = Address, AddrCounter) ->
434-
Scopes = get_scopes(Proto, AllScopesCfg),
435-
IsAdvertisable = in_scope(Proto, tpic, AllScopesCfg),
436-
IsRightProto = is_right_proto(Name, Proto),
437-
%% ?LOG_DEBUG("ann dbg ~p ~p ~p ~p", [Name, IsAdvertisable, IsRightProto, Address]),
438-
439-
if
440-
IsRightProto == true andalso IsAdvertisable == true ->
441-
try
442-
TranslatedAddress =
443-
add_hostname(substitute_macro(Address, MacroDict), Hostname),
444-
announce_one_service(Name, TranslatedAddress, Ttl, Scopes),
445-
AddrCounter + 1
446-
catch
447-
pass ->
448-
?LOG_DEBUG("skip address (can't substitute macro?): ~p", [Address]),
449-
AddrCounter
450-
end;
438+
Announcer = fun(Name, ListAcc) ->
439+
lists:foldl(
440+
% #{address => local4, port => 53221, proto => tpic}
441+
fun(#{proto := Proto} = Address, Acc) ->
442+
Scopes = get_scopes(Proto, AllScopesCfg),
443+
IsAdvertisable = in_scope(Proto, tpic, AllScopesCfg),
444+
IsRightProto = is_right_proto(Name, Proto),
445+
if IsRightProto == true andalso IsAdvertisable == true ->
446+
try
447+
TranslatedAddress =
448+
add_hostname(substitute_macro(Address, MacroDict), Hostname),
449+
case prepare_announce_one_service(
450+
Name,
451+
TranslatedAddress,
452+
Ttl,
453+
Scopes) of
454+
{ok, Bin} ->
455+
[Bin|Acc];
456+
error ->
457+
Acc
458+
end
459+
catch
460+
pass ->
461+
?LOG_DEBUG("skip address (can't substitute macro?): ~p", [Address]),
462+
Acc
463+
end;
464+
true ->
465+
Acc
466+
end;
467+
(Address, Acc) ->
468+
?LOG_DEBUG("skip announce for invalid address ~p ~p", [Name, Address]),
469+
Acc
470+
end,
471+
ListAcc,
472+
Addresses)
473+
end,
474+
lists:foldl(Announcer, [], LocalNames).
451475

452-
true ->
453-
%% ?LOG_DEBUG("skip announce for address ~p ~p", [Name, Address]),
454-
AddrCounter
455-
end;
456-
(Address, AddrCounter) ->
457-
?LOG_DEBUG("skip announce for invalid address ~p ~p", [Name, Address]),
458-
AddrCounter
459-
end,
460-
0,
461-
Addresses)
476+
477+
make_announce(Dict, State) ->
478+
?LOG_DEBUG("Announcing our local services"),
479+
List2Announce = prepare_announce(Dict, State),
480+
Announcer = fun(Bin, Cnt) ->
481+
send_service_announce(Bin),
482+
Cnt+1
462483
end,
463-
ServicesCount = lists:foldl(Announcer, 0, LocalNames),
484+
ServicesCount = lists:foldl(Announcer, 0, List2Announce),
464485
?LOG_DEBUG("Announced ~p of our services", [ServicesCount]),
465486
ok.
466487

@@ -756,11 +777,42 @@ query_remote(Name, _Dict, Chain) ->
756777

757778
% --------------------------------------------------------
758779

780+
query_raw(Name0, Chain0, State) ->
781+
Name = convert_to_binary(Name0),
782+
LC=blockchain:chain(),
783+
#{local_services := #{names:=LocalDict}, remote_services := RemoteDict} = State,
784+
{Chain,Local} = if is_integer(Chain0) andalso Chain0>0 andalso Chain0=/=LC ->
785+
{Chain0,[]};
786+
true ->
787+
{LC,
788+
prepare_announce(
789+
#{names=>
790+
maps:with([Name0],LocalDict)
791+
}, State)
792+
}
793+
end,
794+
RQR=fun(RName0, Dict) ->
795+
Name1 = add_chain_to_name(RName0, Chain),
796+
Nodes = maps:get(Name1, Dict, #{}),
797+
Announces = maps:values(Nodes),
798+
lists:map(
799+
fun(#{bin:=B}) ->
800+
%maps:merge(Address, maps:with([nodeid,node_name,pubkey],A))
801+
B
802+
end, Announces
803+
)
804+
end,
805+
806+
lists:merge(Local, RQR(Name,RemoteDict)).
807+
808+
759809
query(Name0, Chain, State) ->
760810
Name = convert_to_binary(Name0),
761811
LocalChain = blockchain:chain(),
762812
#{local_services := LocalDict, remote_services := RemoteDict} = State,
763813
Local = case Chain of
814+
0 ->
815+
query_local(Name, LocalDict, State);
764816
LocalChain ->
765817
query_local(Name, LocalDict, State);
766818
_ ->
@@ -774,11 +826,6 @@ query(Name0, Chain, State) ->
774826

775827
% --------------------------------------------------------
776828

777-
query(Pred, _State) when is_function(Pred) ->
778-
?LOG_ERROR("Not inmplemented"),
779-
not_implemented;
780-
781-
% find service by name
782829
query(Name, State) ->
783830
query(Name, blockchain:chain(), State).
784831

@@ -870,7 +917,7 @@ process_announce(
870917
try
871918
Key = address2key(Address),
872919
Name = add_chain_to_name(Name0, Chain),
873-
Announce = add_valid_until(Announce0, MaxTtl),
920+
Announce = add_valid_until(Announce0#{bin=>AnnounceBin}, MaxTtl),
874921
Nodes = maps:get(Name, Dict, #{}),
875922
PrevAnnounce = maps:get(Key, Nodes, #{created => 0, ttl=> 0, sent_xchain => 0}),
876923
SentXchain = relay_announce(PrevAnnounce, Announce, AnnounceBin, XChainThrottle),

apps/tpnode/src/tpnode_tpicapi.erl

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ init(Req0, _) ->
3030
throw:{return,RCode,RBody} ->
3131
{ok, cowboy_req:reply(RCode, #{}, RBody, Req0), #{} }
3232
end.
33-
33+
34+
%% -- [ pick_block ] --
35+
3436
handle(<<"GET">>, [<<"pick_block">>,Hash], _Req) ->
3537
handle(<<"GET">>, [<<"pick_block">>,Hash,<<"self">>], _Req);
3638

@@ -56,13 +58,36 @@ handle(<<"GET">>, [<<"pick_block">>,THash,TRel], _Req) ->
5658
{404, <<"not found">>}
5759
end;
5860

61+
%% -- [ sync ] --
62+
5963
handle(<<"GET">>,[<<"sync">>,<<"request">>], _Req) ->
6064
{200,
61-
#{<<"content-type">> =><<"binary/msgpack">>},
65+
#{<<"content-type">> =><<"application/msgpack">>},
6266
msgpack:pack(gen_server:call(blockchain_reader,sync_req))
6367
};
6468

65-
handle(<<"GET">>,[<<"txpool">>,TxID], _Req) ->
69+
%% -- [ discovery ] --
70+
71+
handle(<<"GET">>,[<<"discovery">>,<<"tpicpeer">>,_BChain]=Path,Req) ->
72+
case Req of
73+
#{cert:=Cert} when is_binary(Cert) ->
74+
handle(get, Path ,Req);
75+
_ ->
76+
{403, <<"unauth">>}
77+
end;
78+
79+
handle(<<"GET">>,[<<"discovery">>,Service,BChain], Req) ->
80+
handle(get,[<<"discovery">>,Service,BChain], Req);
81+
82+
handle(get,[<<"discovery">>,Service,BChain], _Req) ->
83+
Chain=binary_to_integer(BChain),
84+
List=gen_server:call(discovery,{lookup_raw,Service,Chain}),
85+
Body=msgpack:pack(List),
86+
{200, #{ <<"content-type">> => <<"application/msgpack">> }, Body};
87+
88+
%% -- [ txstorage ] --
89+
90+
handle(<<"GET">>,[<<"txstorage">>,TxID], _Req) ->
6691
case tpnode_txstorage:get_txm(TxID) of
6792
{ok, #{body:=Body,
6893
origin:=Origin,
@@ -79,7 +104,7 @@ handle(<<"GET">>,[<<"txpool">>,TxID], _Req) ->
79104
{404, <<"not found">>}
80105
end;
81106

82-
handle(<<"PATCH">>,[<<"txpool">>,TxID], #{cert:=Cert}=_Req) when is_binary(Cert) ->
107+
handle(<<"PATCH">>,[<<"txstorage">>,TxID], #{cert:=Cert}=_Req) when is_binary(Cert) ->
83108
#{pubkey:=PubKey}=tpic2:extract_cert_info(public_key:pkix_decode_cert(Cert,otp)),
84109
case tpnode_txstorage:get_txm(TxID) of
85110
{ok, #{origin:=Origin, valid:=_Valid}} when Origin==PubKey ->
@@ -100,8 +125,7 @@ handle(<<"PATCH">>,[<<"txpool">>,TxID], #{cert:=Cert}=_Req) when is_binary(Cert)
100125
{404, <<"not found">>}
101126
end;
102127

103-
104-
handle(<<"PUT">>,[<<"txpool">>, TxID], #{cert:=Cert,has_body:=true}=Req) when is_binary(Cert) ->
128+
handle(<<"PUT">>,[<<"txstorage">>, TxID], #{cert:=Cert,has_body:=true}=Req) when is_binary(Cert) ->
105129
{ok, Body, Req1} = cowboy_req:read_body(Req),
106130
#{pubkey:=PubKey}=tpic2:extract_cert_info(public_key:pkix_decode_cert(Cert,otp)),
107131
case gen_server:call(txstorage,
@@ -117,6 +141,46 @@ handle(<<"PUT">>,[<<"txpool">>, TxID], #{cert:=Cert,has_body:=true}=Req) when is
117141
{{400, <<"body_mismatch">>},Req1}
118142
end;
119143

144+
% -- [ tx ] --
145+
146+
handle(<<"PUT">>, [<<"tx">>,<<"multi">>], #{has_body := true} = Req) ->
147+
{ok, Body, Req1} = cowboy_req:read_body(Req),
148+
{ case maps:get(<<"content-type">>,Req1,undefined) of
149+
<<"application/msgpack">> ->
150+
{ok,Lst} = msgpack:unpack(Body),
151+
if(is_list(Lst)) -> ok;
152+
true -> throw({return,400,<<"nolist">>})
153+
end,
154+
Res=lists:map(
155+
fun(Bin) when is_binary(Bin) ->
156+
case txpool:new_tx(Bin) of
157+
{ok, TxID} ->
158+
TxID;
159+
{error, Err} ->
160+
[<<"error">>,
161+
iolist_to_binary(io_lib:format("~p", [Err]))
162+
]
163+
end
164+
end, Lst),
165+
{200, #{<<"content-type">> =><<"application/msgpack">>}, msgpack:pack(Res) };
166+
_ ->
167+
{400, <<"unexpected content-type">>}
168+
end, Req};
169+
170+
%
171+
handle(<<"PUT">>, [<<"tx">>], #{has_body := true} = Req) ->
172+
{ok, Body, Req1} = cowboy_req:read_body(Req),
173+
{ case txpool:new_tx(Body) of
174+
{ok, TxID} ->
175+
{200,TxID};
176+
{error, Err} ->
177+
{500,
178+
iolist_to_binary(io_lib:format("~p", [Err]))
179+
}
180+
end, Req1};
181+
182+
% -- [ status ] --
183+
120184
handle(<<"GET">>,[<<"status">>], #{cert:=Cert}) when is_binary(Cert) ->
121185
#{pubkey:=PubKey}=tpic2:extract_cert_info(public_key:pkix_decode_cert(Cert,otp)),
122186
{200,#{

0 commit comments

Comments
 (0)