From b752e2c9ea90528a100b1203056a280b7c040e0b Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 15 Jan 2025 21:00:17 +0000 Subject: [PATCH] Mas d34 leveled.i465 stopfold (#467) * Test and fix - issue with folding beyond JournalSQN Test previously fails, as even on a fast machine the fold goes on for 5s beyond the last object found. With change to reduce batch size, and stop when batch goes beyond JournalSQN - success with << 100ms spent folding after the last object discovered * Wait after suite for delete_pending to close https://github.com/martinsumner/leveled/issues/462 * Avoid processing key changes in object fold runner As the key changes are going to be discarded --- include/leveled.hrl | 2 +- src/leveled_codec.erl | 1 + src/leveled_inker.erl | 104 ++++++++++++++++++++------------- src/leveled_runner.erl | 18 +++--- test/end_to_end/riak_SUITE.erl | 59 +++++++++++++++++++ test/end_to_end/testutil.erl | 3 +- 6 files changed, 135 insertions(+), 52 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 5244e380..e6596330 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -35,7 +35,7 @@ -define(MAX_SSTSLOTS, 256). -define(MAX_MERGEBELOW, 24). -define(LOADING_PAUSE, 1000). --define(LOADING_BATCH, 1000). +-define(LOADING_BATCH, 200). -define(CACHE_SIZE_JITTER, 25). -define(JOURNAL_SIZE_JITTER, 20). -define(LONG_RUNNING, 1000000). diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 517f9c60..81797993 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -46,6 +46,7 @@ maybe_compress/2, create_value_for_journal/3, revert_value_from_journal/1, + revert_value_from_journal/2, generate_ledgerkv/5, get_size/2, get_keyandobjhash/2, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 9721267a..8d129c02 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -565,7 +565,8 @@ handle_call({fold, Folder = fun() -> fold_from_sequence( - StartSQN, + StartSQN, + State#state.journal_sqn, {FilterFun, InitAccFun, FoldFun}, Acc, Manifest @@ -1240,8 +1241,12 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> --spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list()) - -> any(). +-spec fold_from_sequence( + non_neg_integer(), + pos_integer(), + {fun(), fun(), fun()}, + any(), + list()) -> any(). %% @doc %% %% Scan from the starting sequence number to the end of the Journal. Apply @@ -1249,71 +1254,88 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> %% objects - and then apply the FoldFun to the batch once the batch is %% complete %% -%% Inputs - MinSQN, FoldFuns, OverallAccumulator, Inker's Manifest +%% Inputs - MinSQN, JournalSQN, FoldFuns, OverallAccumulator, Inker's Manifest %% %% The fold loops over all the CDB files in the Manifest. Each file is looped %% over in batches using foldfile_between_sequence/7. The batch is a range of %% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted %% files -fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) -> +fold_from_sequence(_MinSQN, _JournalSQN, _FoldFuns, Acc, []) -> Acc; -fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest]) - when LowSQN >= MinSQN -> - {NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Acc, - Pid, - undefined, - FN), - fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest); -fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> +fold_from_sequence( + MinSQN, JournalSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest]) + when LowSQN >= MinSQN -> + {NextMinSQN, Acc0} = + foldfile_between_sequence( + MinSQN, + MinSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + Acc, + Pid, + undefined, + FN + ), + fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest); +fold_from_sequence( + MinSQN, JournalSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> % If this file has a LowSQN less than the minimum, we can skip it if the % next file also has a LowSQN below the minimum {NextMinSQN, Acc0} = case Rest of [] -> - foldfile_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Acc, - Pid, - undefined, - FN); + foldfile_between_sequence( + MinSQN, + MinSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + Acc, + Pid, + undefined, + FN + ); [{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN -> - foldfile_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Acc, - Pid, - undefined, - FN); + foldfile_between_sequence( + MinSQN, + MinSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + Acc, + Pid, + undefined, + FN + ); _ -> {MinSQN, Acc} end, - fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest). + fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest). -foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, - Acc, CDBpid, StartPos, FN) -> +foldfile_between_sequence( + MinSQN, MaxSQN, JournalSQN, FoldFuns, Acc, CDBpid, StartPos, FN) -> {FilterFun, InitAccFun, FoldFun} = FoldFuns, InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)}, case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of {eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} -> {AccMinSQN, FoldFun(BatchAcc, Acc)}; + {_LastPosition, {AccMinSQN, _AccMaxSQN, BatchAcc}} + when AccMinSQN >= JournalSQN -> + {AccMinSQN, FoldFun(BatchAcc, Acc)}; {LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> UpdAcc = FoldFun(BatchAcc, Acc), NextSQN = MaxSQN + 1, - foldfile_between_sequence(NextSQN, - NextSQN + ?LOADING_BATCH, - FoldFuns, - UpdAcc, - CDBpid, - LastPosition, - FN) + foldfile_between_sequence( + NextSQN, + NextSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + UpdAcc, + CDBpid, + LastPosition, + FN + ) end. - sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> lists:foldl( fun(FN, Acc) -> diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index b1c3f219..c8d241de 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -322,7 +322,7 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> FilterFun = fun(JKey, JVal, _Pos, Acc, ExtractFun) -> {SQN, InkTag, LedgerKey} = JKey, - case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of + case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of {?INKT_STND, {B, K}} -> % Ignore tombstones and non-matching Tags and Key changes % objects. @@ -335,14 +335,14 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> _ -> {VBin, _VSize} = ExtractFun(JVal), {Obj, _IdxSpecs} = - leveled_codec:revert_value_from_journal(VBin), - ToLoop = - case SQN of - MaxSQN -> stop; - _ -> loop - end, - {ToLoop, - {MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]}} + leveled_codec:revert_value_from_journal( + VBin, + true + ), + { + case SQN of MaxSQN -> stop; _ -> loop end, + {MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]} + } end; _ -> {loop, Acc} diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 8d31dd71..81c1f6fa 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -9,6 +9,7 @@ fetchclocks_modifiedbetween/1, crossbucket_aae/1, handoff/1, + handoff_close/1, dollar_bucket_index/1, dollar_key_index/1, bigobject_memorycheck/1, @@ -22,6 +23,7 @@ all() -> [ fetchclocks_modifiedbetween, crossbucket_aae, handoff, + handoff_close, dollar_bucket_index, dollar_key_index, bigobject_memorycheck, @@ -1697,6 +1699,63 @@ dollar_key_index(_Config) -> ok = leveled_bookie:book_close(Bookie1), testutil:reset_filestructure(). +handoff_close(_Config) -> + RootPath = testutil:reset_filestructure(), + KeyCount = 500000, + Bucket = {<<"BType">>, <<"BName">>}, + StartOpts1 = + [ + {root_path, RootPath}, + {max_journalobjectcount, KeyCount + 1}, + {max_pencillercachesize, 12000}, + {sync_strategy, testutil:sync_strategy()} + ], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + ObjList1 = + testutil:generate_objects( + KeyCount div 10, + {fixed_binary, 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + ObjList2 = + testutil:generate_objects( + KeyCount - (KeyCount div 10), + {fixed_binary, KeyCount div 10 + 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList1), + FoldObjectsFun = + fun(_, _, _, Acc) -> + [os:timestamp()|Acc] + end, + {async, Runner} = + leveled_bookie:book_objectfold( + Bookie1, + ?RIAK_TAG, + {FoldObjectsFun, []}, + true, + sqn_order + ), + testutil:riakload(Bookie1, ObjList2), + TSList = Runner(), + QueryCompletionTime = os:timestamp(), + LastTS = hd(TSList), + io:format( + "Found ~w objects with Last TS ~w completion time ~w~n", + [length(TSList), LastTS, QueryCompletionTime] + ), + true = KeyCount div 10 == length(TSList), + TimeSinceLastObjectTouchedMS = + timer:now_diff(QueryCompletionTime, LastTS) div 1000, + true = TimeSinceLastObjectTouchedMS < 1000, + leveled_bookie:book_destroy(Bookie1), + testutil:reset_filestructure(). + + %% @doc test that the riak specific $bucket indexes can be iterated %% using leveled's existing folders dollar_bucket_index(_Config) -> diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index e437b1bb..32a569de 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -143,7 +143,8 @@ end_per_suite(_Config) -> ok = logger:set_primary_config(level, notice), ok = logger:set_handler_config(default, level, all), ok = logger:set_handler_config(cth_log_redirect, level, all), - + % 10s delay to allow for any delete_pending files to close wihtout crashing + timer:sleep(10000), ok. riak_object(Bucket, Key, Value, MetaData) ->