diff --git a/include/leveled.hrl b/include/leveled.hrl index cb645aa1..2e9ebb38 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -35,7 +35,7 @@ -define(MAX_SSTSLOTS, 256). -define(MAX_MERGEBELOW, 24). -define(LOADING_PAUSE, 1000). --define(LOADING_BATCH, 1000). +-define(LOADING_BATCH, 200). -define(CACHE_SIZE_JITTER, 25). -define(JOURNAL_SIZE_JITTER, 20). -define(LONG_RUNNING, 1000000). diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index c7a8014e..ae0036c9 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -756,6 +756,8 @@ delete_pending({call, From}, cdb_close, State) -> State#state.filename, State#state.waste_path), {stop_and_reply, normal, [{reply, From, ok}]}; +delete_pending({call, From}, Event, State) -> + handle_sync_event(Event, From, State); delete_pending(cast, delete_confirmed, State=#state{delete_point=ManSQN}) -> leveled_log:log(cdb04, [State#state.filename, ManSQN]), close_pendingdelete(State#state.handle, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 6e381518..3880e6ce 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -558,10 +558,13 @@ handle_call({fold, Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)), Folder = fun() -> - fold_from_sequence(StartSQN, - {FilterFun, InitAccFun, FoldFun}, - Acc, - Manifest) + fold_from_sequence( + StartSQN, + State#state.journal_sqn, + {FilterFun, InitAccFun, FoldFun}, + Acc, + Manifest + ) end, case By of as_ink -> @@ -1211,8 +1214,12 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> --spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list()) - -> any(). +-spec fold_from_sequence( + non_neg_integer(), + pos_integer(), + {fun(), fun(), fun()}, + any(), + list()) -> any(). %% @doc %% %% Scan from the starting sequence number to the end of the Journal. Apply @@ -1226,62 +1233,79 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> %% over in batches using foldfile_between_sequence/7. The batch is a range of %% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted %% files -fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) -> +fold_from_sequence(_MinSQN, _JournalSQN, _FoldFuns, Acc, []) -> Acc; -fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest]) +fold_from_sequence(MinSQN, JournalSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest]) when LowSQN >= MinSQN -> - {NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Acc, - Pid, - undefined, - FN), - fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest); -fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> + {NextMinSQN, Acc0} = + foldfile_between_sequence( + MinSQN, + MinSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + Acc, + Pid, + undefined, + FN + ), + fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest); +fold_from_sequence( + MinSQN, JournalSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> % If this file has a LowSQN less than the minimum, we can skip it if the % next file also has a LowSQN below the minimum {NextMinSQN, Acc0} = case Rest of [] -> - foldfile_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Acc, - Pid, - undefined, - FN); + foldfile_between_sequence( + MinSQN, + MinSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + Acc, + Pid, + undefined, + FN + ); [{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN -> - foldfile_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Acc, - Pid, - undefined, - FN); + foldfile_between_sequence( + MinSQN, + MinSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + Acc, + Pid, + undefined, + FN + ); _ -> {MinSQN, Acc} end, - fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest). + fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest). -foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, - Acc, CDBpid, StartPos, FN) -> +foldfile_between_sequence( + MinSQN, MaxSQN, JournalSQN, FoldFuns, Acc, CDBpid, StartPos, FN) -> {FilterFun, InitAccFun, FoldFun} = FoldFuns, InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)}, case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of {eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} -> {AccMinSQN, FoldFun(BatchAcc, Acc)}; + {_LastPosition, {AccMinSQN, _AccMaxSQN, BatchAcc}} + when AccMinSQN >= JournalSQN -> + {AccMinSQN, FoldFun(BatchAcc, Acc)}; {LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> UpdAcc = FoldFun(BatchAcc, Acc), NextSQN = MaxSQN + 1, - foldfile_between_sequence(NextSQN, - NextSQN + ?LOADING_BATCH, - FoldFuns, - UpdAcc, - CDBpid, - LastPosition, - FN) + foldfile_between_sequence( + NextSQN, + NextSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + UpdAcc, + CDBpid, + LastPosition, + FN + ) end. diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 410217eb..816c76c0 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -389,7 +389,7 @@ do_merge( add_entry(empty, FileName, _TS1, Additions) -> leveled_log:log(pc013, [FileName]), - {[], [], Additions}; + {Additions, [], []}; add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, Entry = diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 4c5cbab8..ecccdc04 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -11,6 +11,8 @@ fetchclocks_modifiedbetween/1, crossbucket_aae/1, handoff/1, + handoff_close/1, + handoff_withcompaction/1, dollar_bucket_index/1, dollar_key_index/1, bigobject_memorycheck/1, @@ -23,6 +25,8 @@ all() -> [ fetchclocks_modifiedbetween, crossbucket_aae, handoff, + handoff_close, + handoff_withcompaction, dollar_bucket_index, dollar_key_index, bigobject_memorycheck, @@ -1633,6 +1637,149 @@ dollar_key_index(_Config) -> ok = leveled_bookie:book_close(Bookie1), testutil:reset_filestructure(). +handoff_close(_Config) -> + RootPath = testutil:reset_filestructure(), + KeyCount = 500000, + Bucket = {<<"BType">>, <<"BName">>}, + StartOpts1 = + [ + {root_path, RootPath}, + {max_journalobjectcount, KeyCount + 1}, + {max_pencillercachesize, 12000}, + {sync_strategy, testutil:sync_strategy()} + ], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + ObjList1 = + testutil:generate_objects( + KeyCount div 10, + {fixed_binary, 1}, [], + leveled_rand:rand_bytes(512), + fun() -> [] end, + Bucket + ), + ObjList2 = + testutil:generate_objects( + KeyCount - (KeyCount div 10), + {fixed_binary, KeyCount div 10 + 1}, [], + leveled_rand:rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList1), + FoldObjectsFun = + fun(_, _, _, Acc) -> + [os:timestamp()|Acc] + end, + {async, Runner} = + leveled_bookie:book_objectfold( + Bookie1, + ?RIAK_TAG, + {FoldObjectsFun, []}, + true, + sqn_order + ), + testutil:riakload(Bookie1, ObjList2), + TSList = Runner(), + QueryCompletionTime = os:timestamp(), + LastTS = hd(TSList), + io:format( + "Found ~w objects with Last TS ~w completion time ~w~n", + [length(TSList), LastTS, QueryCompletionTime] + ), + true = KeyCount div 10 == length(TSList), + TimeSinceLastObjectTouchedMS = + timer:now_diff(QueryCompletionTime, LastTS) div 1000, + true = TimeSinceLastObjectTouchedMS < 1000, + leveled_bookie:book_destroy(Bookie1), + testutil:reset_filestructure(). + + +handoff_withcompaction(_Config) -> + RootPath = testutil:reset_filestructure(), + KeyCount = 100000, + Bucket = {<<"BType">>, <<"BName">>}, + StartOpts1 = + [ + {root_path, RootPath}, + {max_journalobjectcount, KeyCount div 4}, + {max_pencillercachesize, 12000}, + {sync_strategy, testutil:sync_strategy()}, + {max_run_length, 4} + ], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + ObjList1 = + testutil:generate_objects( + KeyCount div 4, + {fixed_binary, 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList1), + ObjList2 = + testutil:generate_objects( + KeyCount div 4, + {fixed_binary, (KeyCount div 4) + 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList2), + ObjList3 = + testutil:generate_objects( + KeyCount div 4, + {fixed_binary, (KeyCount div 4) * 2 + 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList3), + ObjList4 = + testutil:generate_objects( + KeyCount div 4, + {fixed_binary, (KeyCount div 4) * 3 + 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList4), + % Now update some objects to prompt compaction + testutil:update_some_objects(Bookie1, ObjList1, KeyCount div 8), + testutil:update_some_objects(Bookie1, ObjList2, KeyCount div 8), + testutil:update_some_objects(Bookie1, ObjList3, KeyCount div 8), + testutil:update_some_objects(Bookie1, ObjList4, KeyCount div 8), + + % Setup a handoff-style fold to snapshot journal + FoldObjectsFun = + fun(_, K, _, Acc) -> + [K|Acc] + end, + {async, Runner} = + leveled_bookie:book_objectfold( + Bookie1, + ?RIAK_TAG, + {FoldObjectsFun, []}, + true, + sqn_order + ), + + % Now compact the journal, twice to be sure + ok = leveled_bookie:book_compactjournal(Bookie1, 30000), + testutil:wait_for_compaction(Bookie1), + ok = leveled_bookie:book_compactjournal(Bookie1, 30000), + testutil:wait_for_compaction(Bookie1), + + % Run the fold - some cdb files should now be delete_pending + {TC0, Results} = timer:tc(Runner), + io:format( + "Found ~w objects in ~w ms~n", + [length(Results), TC0 div 1000] + ), + true = KeyCount == length(Results), + leveled_bookie:book_destroy(Bookie1), + testutil:reset_filestructure(). + + %% @doc test that the riak specific $bucket indexes can be iterated %% using leveled's existing folders dollar_bucket_index(_Config) ->