Skip to content
230 changes: 142 additions & 88 deletions lsm-tree/src-prototypes/ScheduledMerges.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE UnboxedTuples #-}

Expand Down Expand Up @@ -356,24 +355,36 @@ invariant conf@LSMConfig{..} (LSMContent _ levels ul) = do

levelsInvariant (ln+1) ls

-- All runs within a level "proper" (as opposed to the incoming runs
-- All regular runs within a level "proper" (as opposed to the incoming runs
-- being merged) should be of the correct size for the level.
expectedRunLengths :: Int -> [Run] -> [Level s] -> ST s ()
expectedRunLengths ln rs ls =
case mergePolicyForLevel ln ls ul of
-- Levels using levelling have only one (incoming) run, which almost
-- always consists of an ongoing merge. The exception is when a
-- levelling run becomes too large and is promoted, in that case
-- initially there's no merge, but it is still represented as an
-- 'IncomingRun', using 'Single'. Thus there are no other resident runs.
LevelLevelling -> assertST $ null rs && null ls
-- Runs in tiering levels usually fit that size, but they can be one
-- larger, if a run has been held back (creating a (T+1)-way merge).
LevelTiering -> assertST $ all (\r -> runToLevelNumber LevelTiering conf r `elem` [ln, ln+1]) rs
-- (This is actually still not really true, but will hold in practice.
-- In the pathological case, all runs passed to the next level can be
-- factor ((T+1)/T) too large, and there the same holding back can lead to
-- factor ((T+2)/T) etc., until at level 12 a run is two levels too large.
LevelLevelling -> do
-- Levelling can only occur on the last level.
assertST $ null ls
-- Levels using levelling have only one (incoming) run, which almost
-- always consists of an ongoing merge. The exception is when a
-- levelling run becomes too large and is promoted, in that case
-- initially there's no merge, but it is still represented as an
-- 'IncomingRun', using 'Single'. Thus there are no other resident
-- runs.
assertST $ null rs
LevelTiering -> do
-- There are no empty runs in tiering levels, as they are either:
-- 1. a mid-level, so deletes can't be dropped and merges can't result
-- in empty runs
-- 2. the first level, so their runs come directly from flushing the
-- write buffer.
assertST $ all (\r -> runSize r > 0) rs
-- Runs in tiering levels usually fit that size, but they can be one
-- larger, if a run has been held back (creating a (T+1)-way merge).
-- TODO: This is actually still not really true, but will hold in
-- practice. In the pathological case, all runs passed to the next
-- level can be factor ((T+1)/T) too large, so holding back there can
-- lead to factor ((T+2)/T) etc., until eventually at level 12 a run
-- is two levels too large.
assertST $ all (\r -> runToLevelNumber LevelTiering conf r `elem` [ln, ln+1]) rs

-- Incoming runs being merged also need to be of the right size, but the
-- conditions are more complicated.
Expand Down Expand Up @@ -1026,19 +1037,19 @@ update :: Tracer (ST s) Event -> LSM s -> Key -> Entry -> ST s ()
update tr (LSMHandle tid scr conf lsmr) k entry = do
traceWith tr $ UpdateEvent tid k entry
sc <- readSTRef scr
content@(LSMContent wb ls unionLevel) <- readSTRef lsmr
content@(LSMContent wb ls ul) <- readSTRef lsmr
modifySTRef' scr (+1)
supplyCreditsLevels (NominalCredit 1) ls
invariant conf content
let wb' = Map.insertWith combine k entry wb
if bufferSize wb' >= maxWriteBufferSize conf
then do
ls' <- increment (LevelEvent tid >$< tr) sc conf (bufferToRun wb') ls unionLevel
let content' = LSMContent Map.empty ls' unionLevel
(ls', ul') <- increment (LevelEvent tid >$< tr) sc conf (bufferToRun wb') ls ul
let content' = LSMContent Map.empty ls' ul'
invariant conf content'
writeSTRef lsmr content'
else
writeSTRef lsmr (LSMContent wb' ls unionLevel)
writeSTRef lsmr (LSMContent wb' ls ul)

supplyMergeCredits :: LSM s -> NominalCredit -> ST s ()
supplyMergeCredits (LSMHandle _ scr conf lsmr) credits = do
Expand Down Expand Up @@ -1160,26 +1171,6 @@ supplyUnionCredits (LSMHandle _ scr conf lsmr) (UnionCredits credits)
invariant conf content
pure c'

-- TODO: At some point the completed merging tree should to moved into the
-- regular levels, so it can be merged with other runs and last level merges can
-- happen again to drop deletes. Also, lookups then don't need to handle the
-- merging tree any more. There are two possible strategies:
--
-- 1. As soon as the merging tree completes, move the resulting run to the
-- regular levels. However, its size does generally not fit the last level,
-- which requires relaxing 'invariant' and adjusting 'increment'.
--
-- If the run is much larger than the resident and incoming runs of the last
-- level, it should also not be included into a merge yet, as that merge
-- would be expensive, but offer very little potential for compaction (the
-- run from the merging tree is already compacted after all). So it needs to
-- be bumped to the next level instead.
--
-- 2. Initially leave the completed run in the union level. Then every time a
-- new last level merge is created in 'increment', check if there is a
-- completed run in the union level with a size that fits the new merge. If
-- yes, move it over.

-- | Like 'remainingDebtMergingTree', but additionally asserts that the debt
-- never increases.
checkedUnionDebt :: MergingTree s -> STRef s Debt -> ST s Debt
Expand Down Expand Up @@ -1207,40 +1198,52 @@ mergeAcc mt = foldl (updateAcc com) Nothing . catMaybes
MergeLevel -> combine
MergeUnion -> combineUnion

resultFromAcc :: LookupAcc -> LookupResult Value Blob
resultFromAcc = \case
Nothing -> NotFound
Just (Insert v b) -> Found v b
Just (Mupsert v) -> Found v Nothing
Just Delete -> NotFound

-- | We handle lookups by accumulating results by going through the runs from
-- most recent to least recent, starting with the write buffer.
--
-- In the real implementation, this is done not on an individual 'LookupAcc',
-- but one for each key, i.e. @Vector (Maybe Entry)@.
doLookup :: Buffer -> [Run] -> UnionLevel s -> Key -> ST s (LookupResult Value Blob)
doLookup wb runs ul k = do
let acc0 = lookupBatch (Map.lookup k wb) k runs
case ul of
NoUnion ->
pure (convertAcc acc0)
Union tree _ -> do
treeBatches <- buildLookupTree tree
let treeResults = lookupBatch Nothing k <$> treeBatches
pure $ convertAcc $ foldLookupTree $
if null wb && null runs
then treeResults
else LookupNode MergeLevel [LookupBatch acc0, treeResults ]
doLookup wb runs ul k =
fmap resultFromAcc $
case ul of
NoUnion -> lookupRegular
Union tree _ | null wb && null runs -> lookupUnion tree
Union tree _ -> lookupBoth tree
where
convertAcc :: LookupAcc -> LookupResult Value Blob
convertAcc = \case
Nothing -> NotFound
Just (Insert v b) -> Found v b
Just (Mupsert v) -> Found v Nothing
Just Delete -> NotFound

-- | Perform a batch of lookups, accumulating the result onto an initial
-- 'LookupAcc'.
--
-- In a real implementation, this would take all keys at once and be in IO.
lookupBatch :: LookupAcc -> Key -> [Run] -> LookupAcc
lookupBatch acc k rs =
lookupRegular =
pure (lookupBatch k (Just wb) runs)

lookupUnion tree = do
treeBatches <- buildLookupTree tree
pure (foldLookupTree (lookupBatch k Nothing <$> treeBatches))

-- both regular and union level: submit multiple batches, combine in the end
lookupBoth tree = do
getCompletedMergingTree tree >>= \case
Just r -> do
-- This case is an optimisation over the one below. We only do a
-- single batch of lookups, but there's an extra allocation to
-- construct the list of runs.
pure (lookupBatch k (Just wb) (runs ++ [r]))
Comment on lines +1232 to +1235
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This optimisation is probably worth it in the real implementation, but we probably don't need it in the prototype. I'll temporarily leave it here while we discuss the possible approaches, but will remove it before merging this PR. A short comment hinting at the possible optimisation should be sufficient.

_ -> do
regularAcc <- lookupRegular
unionAcc <- lookupUnion tree
pure (mergeAcc MergeLevel [regularAcc, unionAcc])

-- | Perform a batch of lookups for a single key. In the real implementation,
-- this instead takes all keys at once and performs disk I\/O.
lookupBatch :: Key -> Maybe Buffer -> [Run] -> LookupAcc
lookupBatch k mwb rs =
let entries = [entry | r <- rs, Just entry <- [Map.lookup k r]]
in foldl (updateAcc combine) acc entries
in foldl (updateAcc combine) (Map.lookup k =<< mwb) entries

data LookupTree a = LookupBatch a
| LookupNode TreeMergeType [LookupTree a]
Expand Down Expand Up @@ -1422,23 +1425,54 @@ depositNominalCredit (NominalDebt nominalDebt)
increment :: forall s. Tracer (ST s) (EventAt EventDetail)
-> Counter
-> LSMConfig
-> Run -> Levels s -> UnionLevel s -> ST s (Levels s)
increment tr sc conf run0 ls0 ul = do
go 1 [run0] ls0
-> Run -> Levels s -> UnionLevel s -> ST s (Levels s, UnionLevel s)
increment tr sc conf run0 = do
go 1 [run0]
where
mergeTypeFor :: Levels s -> LevelMergeType
mergeTypeFor ls = mergeTypeForLevel ls ul

go :: Int -> [Run] -> Levels s -> ST s (Levels s)
go !ln incoming [] = do
traceWith tr' AddLevelEvent
let mergePolicy = mergePolicyForLevel ln [] ul
ir <- newLevelMerge tr' conf ln mergePolicy (mergeTypeFor []) incoming
pure (Level ir [] : [])
go :: Int -> [Run] -> Levels s -> UnionLevel s -> ST s (Levels s, UnionLevel s)
go !ln incoming [] ul = do
-- No existing level to add the incoming runs to, so we add a new one.
-- We first check if there is a completed union that would fit into it.
-- Note that migration wants to create a levelling level, so we don't
-- want to migrate into the first level (which always uses tiering).
case ul of
Union tree _ | ln > 1 -> do
getCompletedMergingTree tree >>= \case
Just run | runToLevelNumber LevelLevelling conf run <= ln ->
migrateUnionLevel run
_ ->
createNewLevel
_ ->
createNewLevel
where
-- The common case, simply create a new level.
createNewLevel = do
traceWith tr' AddLevelEvent
let mergePolicy = mergePolicyForLevel ln [] ul
ir <- newLevelMerge tr' conf ln mergePolicy (mergeTypeForLevel [] ul) incoming
pure (Level ir [] : [], ul)

-- Create a new regular level, migrating the union by adding it to the
-- merge. If we left the union in place, the new level would use tiering
-- since it's not the last level. But now it becomes the last level, so
-- we use levelling.
--
-- This is the same behaviour we'd see if the union level had been a
-- regular level already, consisting only of the completed union run.
-- The cost of the new merge is the same either way, adhering to the
-- usual bounds on costs of a merge.
migrateUnionLevel run = do
traceWith tr' AddLevelEvent
traceWith tr' UnionMigratedEvent
let ul' = NoUnion -- migrated
let mergePolicy = mergePolicyForLevel ln [] ul'
assertST $ mergePolicy == LevelLevelling
ir <- newLevelMerge tr' conf ln mergePolicy (mergeTypeForLevel [] ul') (incoming ++ [run])
pure (Level ir [] : [], ul')

tr' = contramap (EventAt sc ln) tr

go !ln incoming (Level ir rs : ls) = do
go !ln incoming (Level ir rs : ls) ul = do
r <- case ir of
Single r -> do
traceWith tr' $ SingleRunCompletedEvent r
Expand All @@ -1460,8 +1494,8 @@ increment tr sc conf run0 ls0 ul = do
LevelTiering | runTooSmallForLevel LevelTiering conf ln r -> do
traceWith tr' $ RunTooSmallForLevelEvent LevelTiering r

ir' <- newLevelMerge tr' conf ln LevelTiering (mergeTypeFor ls) (incoming ++ [r])
pure (Level ir' rs : ls)
ir' <- newLevelMerge tr' conf ln LevelTiering (mergeTypeForLevel ls ul) (incoming ++ [r])
pure (Level ir' rs : ls, ul)

-- This tiering level is now full. We take the completed merged run
-- (the previous incoming runs), plus all the other runs on this level
Expand All @@ -1471,17 +1505,17 @@ increment tr sc conf run0 ls0 ul = do
traceWith tr' $ LevelIsFullEvent LevelTiering

ir' <- newLevelMerge tr' conf ln LevelTiering MergeMidLevel incoming
ls' <- go (ln+1) resident ls
pure (Level ir' [] : ls')
(ls', ul') <- go (ln+1) resident ls ul
pure (Level ir' [] : ls', ul')

-- This tiering level is not yet full. We move the completed merged run
-- into the level proper, and start the new merge for the incoming runs.
LevelTiering -> do
traceWith tr' $ LevelIsNotFullEvent LevelTiering

ir' <- newLevelMerge tr' conf ln LevelTiering (mergeTypeFor ls) incoming
ir' <- newLevelMerge tr' conf ln LevelTiering (mergeTypeForLevel ls ul) incoming
traceWith tr' (AddRunEvent resident)
pure (Level ir' resident : ls)
pure (Level ir' resident : ls, ul)

-- The final level is using levelling. If the existing completed merge
-- run is too large for this level, we promote the run to the next
Expand All @@ -1492,17 +1526,17 @@ increment tr sc conf run0 ls0 ul = do

assert (null rs && null ls) $ pure ()
ir' <- newLevelMerge tr' conf ln LevelTiering MergeMidLevel incoming
ls' <- go (ln+1) [r] []
pure (Level ir' [] : ls')
(ls', ul') <- go (ln+1) [r] [] ul
pure (Level ir' [] : ls', ul')

-- Otherwise we start merging the incoming runs into the run.
LevelLevelling -> do
traceWith tr' $ LevelIsNotFullEvent LevelLevelling

assert (null rs && null ls) $ pure ()
ir' <- newLevelMerge tr' conf ln LevelLevelling (mergeTypeFor ls)
ir' <- newLevelMerge tr' conf ln LevelLevelling (mergeTypeForLevel ls ul)
(incoming ++ [r])
pure (Level ir' [] : [])
pure (Level ir' [] : [], ul)

where
tr' = contramap (EventAt sc ln) tr
Expand All @@ -1522,6 +1556,7 @@ newLevelMerge tr conf@LSMConfig{..} level mergePolicy mergeType rs = do
mergeDebt = totalDebt physicalDebt,
mergeRuns = rs
}
-- Can be one more, either due to holding back runs or migrating a union.
assertST (length rs `elem` [configSizeRatio, configSizeRatio + 1])
assertWithMsgM $ leq (totalDebt physicalDebt) maxPhysicalDebt
nominalCreditVar <- newSTRef (NominalCredit 0)
Expand Down Expand Up @@ -1791,6 +1826,10 @@ expectCompletedChildren (PendingMerge mt prs trees) = do
expectCompletedMergingTree :: HasCallStack => MergingTree s -> ST s Run
expectCompletedMergingTree = expectInvariant . isCompletedMergingTree

getCompletedMergingTree :: MergingTree s -> ST s (Maybe Run)
getCompletedMergingTree =
fmap (either (const Nothing) Just) . evalInvariant . isCompletedMergingTree

-------------------------------------------------------------------------------
-- Measurements
--
Expand Down Expand Up @@ -1945,6 +1984,7 @@ data EventDetail =
mergeSize :: Int
}
| SingleRunCompletedEvent Run
| UnionMigratedEvent

| RunTooSmallForLevelEvent MergePolicyForLevel Run
| LevelIsFullEvent MergePolicyForLevel
Expand Down Expand Up @@ -1979,3 +2019,17 @@ instance QC.Arbitrary LevelMergeType where

instance QC.Arbitrary TreeMergeType where
arbitrary = QC.elements [MergeLevel, MergeUnion]

instance QC.Arbitrary LSMConfig where
arbitrary = do
configMaxWriteBufferSize <- QC.chooseInt (1, 10)
configSizeRatio <- QC.chooseInt (2, 8)
pure LSMConfig {configMaxWriteBufferSize, configSizeRatio}
shrink (LSMConfig size ratio) =
[ LSMConfig size' ratio'
| (size', ratio') <- QC.liftShrink2 QC.shrink shrinkSizeRatio (size, ratio)
, size' >= 1
]
where
shrinkSizeRatio 4 = []
shrinkSizeRatio _ = [4] -- try shrinking to four, the default ratio
Loading