Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/ecoinpool/src/btc_coindaemon.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ post_workunit(PID) ->
gen_server:cast(PID, post_workunit).

send_result(PID, BData) ->
gen_server:call(PID, {send_result, BData}).
gen_server:call(PID, {send_result, BData}, 30000).

get_first_tx_with_branches(PID, Workunit) ->
gen_server:call(PID, {get_first_tx_with_branches, Workunit}).
Expand Down
2 changes: 1 addition & 1 deletion apps/ecoinpool/src/btc_daemon_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ get_default_payout_address(URL, Auth) ->
end.

get_block_number(URL, Auth) ->
{ok, "200", _ResponseHeaders, ResponseBody} = ecoinpool_util:send_http_req(URL, Auth, "{\"method\":\"getblocknumber\"}"),
{ok, "200", _ResponseHeaders, ResponseBody} = ecoinpool_util:send_http_req(URL, Auth, "{\"method\":\"getblockcount\"}"),
{Body} = ejson:decode(ResponseBody),
proplists:get_value(<<"result">>, Body) + 1.

Expand Down
242 changes: 189 additions & 53 deletions apps/ecoinpool/src/couchdb_sharelogger.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

%%
%% Copyright (C) 2011 Patrick "p2k" Schneider <patrick.p2k.schneider@gmail.com>
%%
Expand Down Expand Up @@ -29,11 +28,42 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-record(state, {
candidates_only,
conn,
known_dbs
subpool_id :: binary() | any,
chain_logging :: main | aux | both,
log_type :: round | interval | both,
shares_db :: term(),
cachetbl :: ets:tid(),
cachetbln :: ets:tid(),
next_update :: erlang:timestamp()
}).

-record(worker_last, {
ip :: string(),
user_agent :: string(),
round :: integer() | undefined
}).

-record(subpool_last, {
block_num :: integer() | undefined,
prev_block :: binary() | undefined,
target :: binary() | undefined,
round :: integer() | undefined
}).

-record(entry, {
id :: {binary(), main|aux} | {binary(), binary(), main|aux},
valids = 0 :: integer(),
invalids = 0 :: integer(),
other_reasons = [] :: [{reject_reason(), integer()}],
last :: #worker_last{} | #subpool_last{},
timestamp :: erlang:timestamp()
}).

% This value cannot be changed at runtime, since it would mess up previous statistics
-define(LOG_INTERVAL, 10).

-define(GREGORIAN_SECONDS_1970, 62167219200).

%% ===================================================================
%% API functions
%% ===================================================================
Expand All @@ -44,14 +74,46 @@ start_link(LoggerId, Config) ->
log_share(LoggerId, Share) ->
gen_server:cast(LoggerId, Share).

%this function was called but it doesn't exist.
datetime_to_now(DateTime) ->
GSeconds = calendar:datetime_to_gregorian_seconds(DateTime),
ESeconds = GSeconds - ?GREGORIAN_SECONDS_1970,
{ESeconds div 1000000, ESeconds rem 1000000, 0}.

%% ===================================================================
%% Gen_Server callbacks
%% ===================================================================

init(Config) ->
CandidatesOnly = proplists:get_value(candidate_shares_only, Config, false),
% Trap exit
process_flag(trap_exit, true),
% Load settings
S = ecoinpool_db:get_couchdb_connection(),
{ok, #state{candidates_only=CandidatesOnly, conn=S, known_dbs=sets:new()}}.
DatabaseName = binary_to_list(proplists:get_value(database, Config, <<"shares">>)),
SubpoolId = proplists:get_value(subpool_id, Config, any),
ChainLogging = case proplists:get_value(chain_logging, Config, <<"main">>) of
<<"aux">> -> aux;
<<"both">> -> both;
_ -> main
end,
LogType = case proplists:get_value(log_type, Config, <<"both">>) of
<<"round">> -> round;
<<"interval">> -> interval;
_ -> both
end,
% Create in-memory cache
CacheTbl = ets:new(cachetbl, [set, protected, {keypos, #entry.id}]),
CacheTblN = ets:new(cachetbln, [set, protected, {keypos, #entry.id}]),
% Setup database
{ok, SharesDB} = setup_shares_db(S, DatabaseName),
% Setup interval logging
NextUpdate = if LogType =:= interval; LogType =:= both ->
% Start update timer
timer:send_interval(5000, check_update), % Set to 5 seconds to allow some randomization and avoid possible update conflicts
% Find next update time
get_next_update_ts();
true -> undefined end,
{ok, #state{subpool_id=SubpoolId, chain_logging=ChainLogging, log_type=LogType, shares_db=SharesDB, cachetbl=CacheTbl, cachetbln=CacheTblN, next_update=NextUpdate}}.

handle_call(_, _From, State) ->
{reply, error, State}.
Expand All @@ -60,7 +122,7 @@ handle_cast(#share{
timestamp=Timestamp,
server_id=local,

subpool_name=SubpoolName,
subpool_id=SubpoolId,
worker_id=WorkerId,
user_id=UserId,
ip=IP,
Expand All @@ -82,54 +144,41 @@ handle_cast(#share{
aux_block_num=AuxBlockNum,
aux_prev_block=AuxPrevBlock,
aux_round=AuxRound},

SState=#state{
candidates_only=CandidatesOnly,
conn=S,
known_dbs=KnownDBs}) ->
subpool_id=FilterSubpoolId,
chain_logging=ChainLogging,
log_type=LogType,
shares_db=SharesDB,
cachetbl=CacheTbl,
cachetbln=CacheTblN,
next_update=NextUpdate
}) when
(SubpoolId =:= FilterSubpoolId) or (FilterSubpoolId =:= any) ->

KnownDBs1 = case sets:is_element(SubpoolName, KnownDBs) of
true -> KnownDBs;
false -> setup_shares_db(S, SubpoolName), sets:add_element(SubpoolName, KnownDBs)
Tbl = case timer:now_diff(Timestamp, NextUpdate) of
D when D < 0 -> CacheTbl;
_ -> CacheTblN
end,
{ok, DB} = couchbeam:open_db(S, SubpoolName),
if
not CandidatesOnly; State =:= candidate ->
case State of
invalid ->
store_invalid_share_in_db(Timestamp, WorkerId, UserId, IP, UserAgent, RejectReason, Hash, undefined, Target, BlockNum, PrevBlock, Round, DB);
_ ->
store_share_in_db(Timestamp, WorkerId, UserId, IP, UserAgent, State, Hash, undefined, Target, BlockNum, PrevBlock, Data, Round, DB)
end;
true ->
ok
end,
case AuxpoolName of
undefined ->
{noreply, SState#state{known_dbs=KnownDBs1}};
_ ->
KnownDBs2 = case sets:is_element(AuxpoolName, KnownDBs) of
true -> KnownDBs1;
false -> setup_shares_db(S, AuxpoolName), sets:add_element(SubpoolName, KnownDBs1)
end,
{ok, AuxDB} = couchbeam:open_db(S, AuxpoolName),
if
not CandidatesOnly; AuxState =:= candidate ->
case AuxState of
invalid ->
store_invalid_share_in_db(Timestamp, WorkerId, UserId, IP, UserAgent, RejectReason, AuxHash, Hash, AuxTarget, AuxBlockNum, AuxPrevBlock, AuxRound, AuxDB);
_ ->
store_share_in_db(Timestamp, WorkerId, UserId, IP, UserAgent, AuxState, AuxHash, Hash, AuxTarget, AuxBlockNum, AuxPrevBlock, Data, AuxRound, AuxDB)
end;
true ->
ok
end,
{noreply, SState#state{known_dbs=KnownDBs2}}
end;

handle_cast(#share{}, State) -> % Ignore other shares (currently these are remote shares)

% Handle for main chain
if ChainLogging =:= main; ChainLogging =:= both ->
update_subpool(Tbl, SubpoolId, main, Timestamp, State, RejectReason, BlockNum, PrevBlock, Target, Round),
update_worker(Tbl, SubpoolId, WorkerId, main, Timestamp, State, RejectReason, IP, UserAgent, Round);
true -> ok end,
% Handle for aux chain
if (ChainLogging =:= aux) or (ChainLogging =:= both), AuxState =/= undefined ->
AuxRejectReason = if AuxState =:= invalid, RejectReason =:= undefined -> stale; true -> RejectReason end,
update_subpool(Tbl, SubpoolId, aux, Timestamp, AuxState, AuxRejectReason, AuxBlockNum, AuxPrevBlock, AuxTarget, AuxRound),
update_worker(Tbl, SubpoolId, WorkerId, aux, Timestamp, AuxState, AuxRejectReason, IP, UserAgent, AuxRound);
true -> ok end,
{noreply, State};

handle_cast(#share{}, State) -> % Ignore other shares
{noreply, State}.

handle_info(_, State) ->
handle_info(check_update, State) ->
%handle_info(_, State) -> was defined twice, I'm pointing out, not sure what to do with it.
{noreply, State}.

terminate(_, _) ->
Expand All @@ -142,20 +191,107 @@ code_change(_OldVsn, State, _Extra) ->
%% Other functions
%% ===================================================================

setup_shares_db(S, SubpoolName) ->
case couchbeam:open_or_create_db(S, SubpoolName) of
setup_shares_db(S, DatabaseName) ->
case couchbeam:open_or_create_db(S, DatabaseName) of
{ok, DB} ->
lists:foreach(fun ecoinpool_db:check_design_doc/1, [
{DB, "stats", "shares_db_stats.json"},
{DB, "timed_stats", "shares_db_timed_stats.json"},
{DB, "auth", "shares_db_auth.json"}
]),
ok;
{ok, DB};
{error, Error} ->
log4erl:error(db, "shares_db - couchbeam:open_or_create_db/3 returned an error:~n~p", [Error]),
error
end.

get_next_update_ts() ->
{Date, Time} = calendar:now_to_datetime(erlang:now()),
Secs = calendar:time_to_seconds(Time),
NextDateTime = case ecoinpool_util:ceiling(Secs / (60 * ?LOG_INTERVAL)) * (60 * ?LOG_INTERVAL) of
86400 ->
Days = calendar:date_to_gregorian_days(Date),
{calendar:gregorian_days_to_date(Days + 1), {0,0,0}};
NextSecs ->
{Date, calendar:seconds_to_time(NextSecs)}
end,
datetime_to_now(NextDateTime).

update_subpool(Tbl, SubpoolId, Chain, Timestamp, State, RejectReason, BlockNum, PrevBlock, Target, Round) ->
Entry = case ets:lookup(Tbl, {SubpoolId, Chain}) of
[] -> #entry{id={SubpoolId, Chain}};
[E] -> E
end,
SubpoolLast = update_subpool_last(Entry#entry.last, BlockNum, PrevBlock, Target, Round),
ets:insert(Tbl, update_entry(Entry, Timestamp, State, RejectReason, SubpoolLast)).

update_worker(Tbl, SubpoolId, WorkerId, Chain, Timestamp, State, RejectReason, IP, UserAgent, Round) ->
Entry = case ets:lookup(Tbl, {SubpoolId, WorkerId, Chain}) of
[] -> #entry{id={SubpoolId, WorkerId, Chain}};
[E] -> E
end,
WorkerLast = update_worker_last(Entry#entry.last, IP, UserAgent, Round),
ets:insert(Tbl, update_entry(Entry, Timestamp, State, RejectReason, WorkerLast)).

update_entry(Entry=#entry{valids=Valids, invalids=Invalids}, Timestamp, State, RejectReason, Last) ->
case State of
invalid ->
case RejectReason of
stale ->
Entry#entry{
invalids=Invalids + 1,
last=Last,
timestamp=Timestamp
};
_ ->
OtherReasons = Entry#entry.other_reasons,
OtherReasonCount = case lists:keyfind(RejectReason, 1, OtherReasons) of
false -> 0;
{_, C} -> C
end,
Entry#entry{
invalids=Invalids + 1,
other_reasons=lists:keystore(RejectReason, 1, OtherReasons, {RejectReason, OtherReasonCount + 1}),
last=Last,
timestamp=Timestamp
}
end;
_ -> % valid | candidate
Entry#entry{
valids=Valids + 1,
last=Last,
timestamp=Timestamp
}
end.

update_subpool_last(undefined, BlockNum, PrevBlock, Target, Round) ->
#subpool_last{
block_num=BlockNum,
prev_block=PrevBlock,
target=Target,
round=Round
};
update_subpool_last(#subpool_last{block_num=LastBlockNum, prev_block=LastPrevBlock, target=LastTarget, round=LastRound}, BlockNum, PrevBlock, Target, Round) ->
#subpool_last{
block_num=case BlockNum of undefined -> LastBlockNum; _ -> BlockNum end,
prev_block=case PrevBlock of undefined -> LastPrevBlock; _ -> PrevBlock end,
target=case Target of undefined -> LastTarget; _ -> Target end,
round=case Round of undefined -> LastRound; _ -> Round end
}.

update_worker_last(undefined, IP, UserAgent, Round) ->
#worker_last{
ip=IP,
user_agent=UserAgent,
round=Round
};
update_worker_last(#worker_last{ip=LastIP, user_agent=LastUserAgent, round=LastRound}, IP, UserAgent, Round) ->
#worker_last{
ip=case IP of undefined -> LastIP; _ -> IP end,
user_agent=case UserAgent of undefined -> LastUserAgent; _ -> UserAgent end,
round=case Round of undefined -> LastRound; _ -> Round end
}.

-spec make_share_document(Timestamp :: erlang:timestamp(), WorkerId :: binary(), UserId :: term(), IP :: string(), UserAgent :: string(), State :: valid | candidate, Hash :: binary(), ParentHash :: binary() | undefined, Target :: binary(), BlockNum :: integer(), PrevBlock :: binary(), BData :: binary(), Round :: integer()) -> {[]}.
make_share_document(Timestamp, WorkerId, UserId, IP, UserAgent, State, Hash, ParentHash, Target, BlockNum, PrevBlock, BData, Round) ->
{{YR,MH,DY}, {HR,ME,SD}} = calendar:now_to_datetime(Timestamp),
Expand Down
2 changes: 1 addition & 1 deletion apps/ecoinpool/src/ecoinpool_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ get_worker_record(WorkerId) ->

-spec get_workers_for_subpools(SubpoolIds :: [binary()]) -> [worker()].
get_workers_for_subpools(SubpoolIds) ->
gen_server:call(?MODULE, {get_workers_for_subpools, SubpoolIds}).
gen_server:call(?MODULE, {get_workers_for_subpools, SubpoolIds}, 10000).

-spec set_subpool_round(Subpool :: subpool(), Round :: integer()) -> ok.
set_subpool_round(#subpool{id=SubpoolId}, Round) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/ecoinpool/src/ecoinpool_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ headers_from_options(Options) ->
(longpolling, AccHeaders) ->
[{"X-Long-Polling", "/LP"} | AccHeaders];
(rollntime, AccHeaders) ->
[{"X-Roll-NTime", "expire=10"} | AccHeaders];
[{"X-Roll-NTime", "expire=120"} | AccHeaders];
({reject_reason, Reason}, AccHeaders) ->
[{"X-Reject-Reason", Reason} | AccHeaders];
({block_num, BlockNum}, AccHeaders) ->
Expand Down
12 changes: 11 additions & 1 deletion apps/ecoinpool/src/ecoinpool_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
make_json_password/3,

daemon_storage_dir/2,
server_storage_dir/1
server_storage_dir/1,

ceiling/1
]).

-on_load(module_init/0).
Expand Down Expand Up @@ -244,3 +246,11 @@ server_storage_dir(SubpoolId) ->
_ -> file:make_dir(Dirname)
end,
Dirname.

ceiling(X) ->
T = erlang:trunc(X),
case (X - T) of
Neg when Neg < 0 -> T;
Pos when Pos > 0 -> T + 1;
_ -> T
end.
2 changes: 1 addition & 1 deletion apps/ecoinpool/src/mysql_sharelogger.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ disconnect(Conn) ->
end.

fetch_result(Conn, Query) ->
case mysql_conn:fetch(Conn, iolist_to_binary(Query), self(), 10000) of
case mysql_conn:fetch(Conn, iolist_to_binary(Query), self(), 30000) of
{data, MyFieldsResult} -> {ok, mysql:get_result_rows(MyFieldsResult)};
{updated, MyUpdateResult} -> {ok, mysql:get_result_affected_rows(MyUpdateResult)};
{error, #mysql_result{error=Reason}} -> {error, Reason};
Expand Down
Loading