Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 39 additions & 20 deletions lsm-tree/src-prototypes/ScheduledMerges.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE UnboxedTuples #-}

{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-}
{-# OPTIONS_GHC -Wno-partial-fields #-}

-- | A prototype of an LSM with explicitly scheduled incremental merges.
Expand Down Expand Up @@ -162,7 +161,11 @@ type LevelNo = Int
-- | A level is a sequence of resident runs at this level, prefixed by an
-- incoming run, which is usually multiple runs that are being merged. Once
-- completed, the resulting run will become a resident run at this level.
--
-- Levels can also be empty, although we currently never create empty levels.
-- We plan to make use of them when migrating a completed union level.
data Level s = Level !(IncomingRun s) ![Run]
| EmptyLevel

-- | We represent single runs specially, rather than putting them in as a
-- 'CompletedMerge'. This is for two reasons: to see statically that it's a
Expand Down Expand Up @@ -311,7 +314,7 @@ lookupWriteBuffer :: Key -> WriteBuffer -> Maybe Entry
lookupWriteBuffer k = Map.lookup k . bufferEntries

-- | Flush a write buffer. In the real implementation, this involves IO.
-- Note that we should not never flush an empty write buffer.
-- Note that we should never flush an empty write buffer.
flushWriteBuffer :: WriteBuffer -> Run
flushWriteBuffer (WriteBuffer m) = assert (not (null m)) (Run m)

Expand Down Expand Up @@ -355,6 +358,9 @@ invariant conf@LSMConfig{..} (LSMContent _ levels ul) = do
levelsInvariant :: Int -> Levels s -> ST s ()
levelsInvariant !_ [] = pure ()

levelsInvariant !_ (EmptyLevel : ls) = do
assertST $ not (null ls) -- last level shouldn't be empty
Comment on lines +361 to +362
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 makes sense, we don't want "trailing empty levels" so to speak


levelsInvariant !ln (Level ir rs : ls) = do
mrs <- case ir of
Single r ->
Expand Down Expand Up @@ -917,7 +923,7 @@ newMergingRun mergeType runs = do
assertST $ length runs > 1
-- in some cases, no merging is required at all
(debt, state) <- case filter (\r -> runSize r > 0) runs of
[] -> let (r:_) = runs -- just reuse the empty input
[] -> let r = head runs -- just reuse the empty input
in pure (runSize r, CompletedMerge r)
[r] -> pure (runSize r, CompletedMerge r)
rs -> do
Expand Down Expand Up @@ -1362,11 +1368,12 @@ newtype NominalDebt = NominalDebt Credit
-- inserting without calling 'supplyUnionCredits'.
supplyCreditsLevels :: NominalCredit -> Levels s -> ST s ()
supplyCreditsLevels nominalDeposit =
traverse_ $ \(Level ir _rs) -> do
case ir of
Single{} -> pure ()
Merging _mp nominalDebt nominalCreditVar
mr@(MergingRun _ physicalDebt _) -> do
traverse_ $ \lvl -> do
case lvl of
EmptyLevel -> pure ()
Level Single{} _ -> pure ()
Level (Merging _mp nominalDebt nominalCreditVar
mr@(MergingRun _ physicalDebt _)) _ -> do

nominalCredit <- depositNominalCredit
nominalDebt nominalCreditVar nominalDeposit
Expand Down Expand Up @@ -1467,6 +1474,14 @@ increment tr sc conf run0 ls0 ul = do
where
tr' = contramap (EventAt sc ln) tr

go !ln incoming (EmptyLevel : ls) = do
assertST $ mergePolicyForLevel ln ls ul == LevelTiering -- mid level
traceWith tr' LevelWasEmptyEvent
ir <- newLevelMerge tr' conf ln LevelTiering (mergeTypeFor ls) incoming
pure (Level ir [] : ls)
where
tr' = contramap (EventAt sc ln) tr

go !ln incoming (Level ir rs : ls) = do
r <- case ir of
Single r -> do
Expand Down Expand Up @@ -1629,24 +1644,19 @@ newLevelMerge tr conf@LSMConfig{..} level mergePolicy mergeType rs = do

-- | Ensures that the merge contains more than one input, avoiding creating a
-- pending merge where possible.
newPendingLevelMerge :: [IncomingRun s]
newPendingLevelMerge :: [PreExistingRun s]
-> Maybe (MergingTree s)
-> ST s (Maybe (MergingTree s))
newPendingLevelMerge [] t = pure t
newPendingLevelMerge [Single r] Nothing =
newPendingLevelMerge [PreExistingRun r] Nothing =
Just . MergingTree <$> newSTRef (CompletedTreeMerge r)
newPendingLevelMerge [Merging{}] Nothing =
newPendingLevelMerge [PreExistingMergingRun{}] Nothing =
-- This case should never occur. If there is a single entry in the list,
-- there can only be one level in the input table. At level 1 there are no
-- merging runs, it must be a Single run flushed from a write buffer.
error "newPendingLevelMerge: singleton Merging run"
newPendingLevelMerge irs tree = do
let prs = map incomingToPreExistingRun irs
st = PendingTreeMerge (PendingLevelMerge prs tree)
Just . MergingTree <$> newSTRef st
where
incomingToPreExistingRun (Single r) = PreExistingRun r
incomingToPreExistingRun (Merging _ _ _ mr) = PreExistingMergingRun mr
newPendingLevelMerge prs t = do
Just . MergingTree <$> newSTRef (PendingTreeMerge (PendingLevelMerge prs t))

-- | Ensures that the merge contains more than one input.
newPendingUnionMerge :: [MergingTree s] -> ST s (Maybe (MergingTree s))
Expand All @@ -1663,9 +1673,14 @@ contentToMergingTree (LSMContent wb ls ul) =
-- flush the write buffer (but this should not modify the content)
buffer
| writeBufferSize wb == 0 = Nothing
| otherwise = Just (Single (flushWriteBuffer wb))
| otherwise = Just (PreExistingRun (flushWriteBuffer wb))

levels = flip concatMap ls $ \case
EmptyLevel -> []
Level ir rs -> toPreExisting ir : map PreExistingRun rs

levels = flip concatMap ls $ \(Level ir rs) -> ir : map Single rs
toPreExisting (Single r) = PreExistingRun r
toPreExisting (Merging _ _ _ mr) = PreExistingMergingRun mr

trees = case ul of
NoUnion -> Nothing
Expand Down Expand Up @@ -1841,6 +1856,7 @@ flattenLevels :: Levels s -> ST s [[Run]]
flattenLevels = mapM flattenLevel

flattenLevel :: Level s -> ST s [Run]
flattenLevel EmptyLevel = pure []
flattenLevel (Level ir rs) = (++ rs) <$> flattenIncomingRun ir

flattenIncomingRun :: IncomingRun s -> ST s [Run]
Expand Down Expand Up @@ -1909,6 +1925,8 @@ dumpRepresentation (LSMHandle _ _ _conf lsmr) = do
pure (wb, levels, tree)

dumpLevel :: Level s -> ST s LevelRepresentation
dumpLevel EmptyLevel =
pure (Nothing, [])
dumpLevel (Level (Single r) rs) =
pure (Nothing, (r:rs))
dumpLevel (Level (Merging mp nd ncv (MergingRun mt _ ref)) rs) = do
Expand Down Expand Up @@ -1977,6 +1995,7 @@ data EventDetail =
| RunTooSmallForLevelEvent MergePolicyForLevel Run
| LevelIsFullEvent MergePolicyForLevel
| LevelIsNotFullEvent MergePolicyForLevel
| LevelWasEmptyEvent
deriving stock Show

-------------------------------------------------------------------------------
Expand Down
Loading