Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 98 additions & 52 deletions lsm-tree/src-prototypes/ScheduledMerges.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE UnboxedTuples #-}

Expand Down Expand Up @@ -106,7 +105,7 @@ import Data.Foldable (for_, toList, traverse_)
import Data.Functor.Contravariant
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes)
import Data.Maybe (catMaybes, maybeToList)
import Data.Primitive.Types
import Data.STRef

Expand Down Expand Up @@ -150,7 +149,7 @@ type Counter = Int
-- | The levels of the table, from most to least recently inserted.
data LSMContent s =
LSMContent
Buffer -- ^ write buffer is level 0 of the table, in-memory
WriteBuffer -- ^ write buffer is level 0 of the table, in-memory
(Levels s) -- ^ \"regular\" levels 1+, on disk in real implementation
(UnionLevel s) -- ^ a potential last level

Expand Down Expand Up @@ -288,17 +287,33 @@ pattern PendingMerge :: TreeMergeType
-> PendingMerge s
pattern PendingMerge mt prs ts <- (pendingContent -> (mt, prs, ts))

type Run = Map Key Entry
type Buffer = Map Key Entry

bufferToRun :: Buffer -> Run
bufferToRun = id
newtype Run = Run { runEntries :: Map Key Entry }
deriving newtype Show

runSize :: Run -> Int
runSize = Map.size
runSize = Map.size . runEntries

lookupRun :: Key -> Run -> Maybe Entry
lookupRun k = Map.lookup k . runEntries

newtype WriteBuffer = WriteBuffer { bufferEntries :: Map Key Entry }

emptyWriteBuffer :: WriteBuffer
emptyWriteBuffer = WriteBuffer Map.empty

writeBufferSize :: WriteBuffer -> Int
writeBufferSize = Map.size . bufferEntries

insertWriteBuffer :: Key -> Entry -> WriteBuffer -> WriteBuffer
insertWriteBuffer k e = WriteBuffer . Map.insertWith combine k e . bufferEntries

lookupWriteBuffer :: Key -> WriteBuffer -> Maybe Entry
lookupWriteBuffer k = Map.lookup k . bufferEntries

bufferSize :: Buffer -> Int
bufferSize = Map.size
-- | Flush a write buffer. In the real implementation, this involves IO.
-- Note that we should not never flush an empty write buffer.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- Note that we should not never flush an empty write buffer.
-- Note that we should not ever flush an empty write buffer.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we should always flush empty write buffers? 😜

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I will fix that as part of the next PR.

flushWriteBuffer :: WriteBuffer -> Run
flushWriteBuffer (WriteBuffer m) = assert (not (null m)) (Run m)

type Entry = Update Value Blob

Expand Down Expand Up @@ -356,24 +371,35 @@ invariant conf@LSMConfig{..} (LSMContent _ levels ul) = do

levelsInvariant (ln+1) ls

-- All runs within a level "proper" (as opposed to the incoming runs
-- All regular runs within a level "proper" (as opposed to the incoming runs
-- being merged) should be of the correct size for the level.
expectedRunLengths :: Int -> [Run] -> [Level s] -> ST s ()
expectedRunLengths ln rs ls =
case mergePolicyForLevel ln ls ul of
-- Levels using levelling have only one (incoming) run, which almost
-- always consists of an ongoing merge. The exception is when a
-- levelling run becomes too large and is promoted, in that case
-- initially there's no merge, but it is still represented as an
-- 'IncomingRun', using 'Single'. Thus there are no other resident runs.
LevelLevelling -> assertST $ null rs && null ls
-- Runs in tiering levels usually fit that size, but they can be one
-- larger, if a run has been held back (creating a (T+1)-way merge).
LevelTiering -> assertST $ all (\r -> runToLevelNumber LevelTiering conf r `elem` [ln, ln+1]) rs
-- (This is actually still not really true, but will hold in practice.
-- In the pathological case, all runs passed to the next level can be
-- factor ((T+1)/T) too large, and there the same holding back can lead to
-- factor ((T+2)/T) etc., until at level 12 a run is two levels too large.
LevelLevelling -> do
-- Levelling can only occur on the last level.
assertST $ null ls
-- Levels using levelling have only one (incoming) run, which almost
-- always consists of an ongoing merge. The exception is when a
-- levelling run becomes too large and is promoted, in that case
-- initially there's no merge, but it is still represented as an
-- 'IncomingRun', using 'Single'. Thus there are no other resident
-- runs.
assertST $ null rs
LevelTiering -> do
-- There are no empty runs in tiering levels, as they are either:
-- 1. a mid-level, so deletes can't be dropped and merges can't result
-- in empty runs
-- 2. the first level, so their runs come directly from flushing the
-- write buffer.
Comment thread
jorisdral marked this conversation as resolved.
assertST $ all (\r -> runSize r > 0) rs
-- Runs in tiering levels usually fit that level's size, but they can
-- be slightly larger if a run has been held back (creating a
-- (T+1)-way merge).
--
-- TODO: Holding back runs can theoretically result in runs that are
-- more than one size too large. See issue #829.
assertST $ all (\r -> runToLevelNumber LevelTiering conf r `elem` [ln, ln+1]) rs

-- Incoming runs being merged also need to be of the right size, but the
-- conditions are more complicated.
Expand Down Expand Up @@ -904,8 +930,10 @@ newMergingRun mergeType runs = do

mergek :: IsMergeType t => t -> [Run] -> Run
mergek t =
(if isLastLevel t then Map.filter (/= Delete) else id)
Run
. (if isLastLevel t then Map.filter (/= Delete) else id)
. Map.unionsWith (if isUnion t then combineUnion else combine)
. map runEntries

-- | Combines two entries that have been performed after another. Therefore, the
-- newer one overwrites the old one (or modifies it for 'Mupsert'). Only take a
Expand Down Expand Up @@ -992,7 +1020,7 @@ newWith tr tid conf
| otherwise = do
traceWith tr $ NewTableEvent tid conf
c <- newSTRef 0
lsm <- newSTRef (LSMContent Map.empty [] NoUnion)
lsm <- newSTRef (LSMContent emptyWriteBuffer [] NoUnion)
pure (LSMHandle tid c conf lsm)

inserts :: Tracer (ST s) Event -> LSM s -> [(Key, Value, Maybe Blob)] -> ST s ()
Expand Down Expand Up @@ -1030,11 +1058,12 @@ update tr (LSMHandle tid scr conf lsmr) k entry = do
modifySTRef' scr (+1)
supplyCreditsLevels (NominalCredit 1) ls
invariant conf content
let wb' = Map.insertWith combine k entry wb
if bufferSize wb' >= maxWriteBufferSize conf
let wb' = insertWriteBuffer k entry wb
if writeBufferSize wb' >= maxWriteBufferSize conf
then do
ls' <- increment (LevelEvent tid >$< tr) sc conf (bufferToRun wb') ls unionLevel
let content' = LSMContent Map.empty ls' unionLevel
let r = flushWriteBuffer wb'
ls' <- increment (LevelEvent tid >$< tr) sc conf r ls unionLevel
let content' = LSMContent emptyWriteBuffer ls' unionLevel
invariant conf content'
writeSTRef lsmr content'
else
Expand Down Expand Up @@ -1099,7 +1128,7 @@ unions tr childTid lsms = do
Just tree -> do
debt <- fst <$> remainingDebtMergingTree tree
Union tree <$> newSTRef debt
lsmr <- newSTRef (LSMContent Map.empty [] unionLevel)
lsmr <- newSTRef (LSMContent emptyWriteBuffer [] unionLevel)
c <- newSTRef 0
pure (LSMHandle childTid c conf lsmr)

Expand Down Expand Up @@ -1212,17 +1241,17 @@ mergeAcc mt = foldl (updateAcc com) Nothing . catMaybes
--
-- In the real implementation, this is done not on an individual 'LookupAcc',
-- but one for each key, i.e. @Vector (Maybe Entry)@.
doLookup :: Buffer -> [Run] -> UnionLevel s -> Key -> ST s (LookupResult Value Blob)
doLookup :: WriteBuffer -> [Run] -> UnionLevel s -> Key -> ST s (LookupResult Value Blob)
doLookup wb runs ul k = do
let acc0 = lookupBatch (Map.lookup k wb) k runs
let acc0 = lookupBatch (lookupWriteBuffer k wb) k runs
case ul of
NoUnion ->
pure (convertAcc acc0)
Union tree _ -> do
treeBatches <- buildLookupTree tree
let treeResults = lookupBatch Nothing k <$> treeBatches
pure $ convertAcc $ foldLookupTree $
if null wb && null runs
if writeBufferSize wb == 0 && null runs
then treeResults
else LookupNode MergeLevel [LookupBatch acc0, treeResults ]
where
Expand All @@ -1239,7 +1268,7 @@ doLookup wb runs ul k = do
-- In a real implementation, this would take all keys at once and be in IO.
lookupBatch :: LookupAcc -> Key -> [Run] -> LookupAcc
lookupBatch acc k rs =
let entries = [entry | r <- rs, Just entry <- [Map.lookup k r]]
let entries = [entry | r <- rs, Just entry <- [lookupRun k r]]
in foldl (updateAcc combine) acc entries

data LookupTree a = LookupBatch a
Expand Down Expand Up @@ -1609,7 +1638,7 @@ newPendingLevelMerge [Single r] Nothing =
newPendingLevelMerge [Merging{}] Nothing =
-- This case should never occur. If there is a single entry in the list,
-- there can only be one level in the input table. At level 1 there are no
-- merging runs, so it must be a PreExistingRun.
-- merging runs, it must be a Single run flushed from a write buffer.
error "newPendingLevelMerge: singleton Merging run"
newPendingLevelMerge irs tree = do
let prs = map incomingToPreExistingRun irs
Expand All @@ -1629,12 +1658,12 @@ newPendingUnionMerge trees = do

contentToMergingTree :: LSMContent s -> ST s (Maybe (MergingTree s))
contentToMergingTree (LSMContent wb ls ul) =
newPendingLevelMerge (buffers ++ levels) trees
newPendingLevelMerge (maybeToList buffer ++ levels) trees
where
-- flush the write buffer (but this should not modify the content)
buffers
| bufferSize wb == 0 = []
| otherwise = [Single (bufferToRun wb)]
buffer
| writeBufferSize wb == 0 = Nothing
| otherwise = Just (Single (flushWriteBuffer wb))

levels = flip concatMap ls $ \(Level ir rs) -> ir : map Single rs

Expand Down Expand Up @@ -1799,7 +1828,7 @@ data MTree r = MLeaf r
| MNode TreeMergeType [MTree r]
deriving stock (Eq, Foldable, Functor, Show)

allLevels :: LSM s -> ST s (Buffer, [[Run]], Maybe (MTree Run))
allLevels :: LSM s -> ST s (WriteBuffer, [[Run]], Maybe (MTree Run))
allLevels (LSMHandle _ _ _conf lsmr) = do
LSMContent wb ls ul <- readSTRef lsmr
rs <- flattenLevels ls
Expand Down Expand Up @@ -1851,8 +1880,9 @@ logicalValue lsm = do
(wb, levels, tree) <- allLevels lsm
let r = mergek
MergeLevel
(wb : concat levels ++ toList (mergeTree <$> tree))
pure (Map.mapMaybe justInsert r)
(Run (bufferEntries wb) -- we don't flush, but treat wb as a run
: concat levels ++ toList (mergeTree <$> tree))
pure (Map.mapMaybe justInsert (runEntries r))
where
mergeTree :: MTree Run -> Run
mergeTree (MLeaf r) = r
Expand All @@ -1862,7 +1892,7 @@ logicalValue lsm = do
justInsert Delete = Nothing
justInsert (Mupsert v) = Just (v, Nothing)

type Representation = (Run, [LevelRepresentation], Maybe (MTree Run))
type Representation = (WriteBuffer, [LevelRepresentation], Maybe (MTree Run))

type LevelRepresentation =
(Maybe (MergePolicyForLevel, NominalDebt, NominalCredit,
Expand Down Expand Up @@ -1892,25 +1922,23 @@ dumpLevel (Level (Merging mp nd ncv (MergingRun mt _ ref)) rs) = do
representationShape :: Representation
-> (Int, [([Int], [Int])], Maybe (MTree Int))
representationShape (wb, levels, tree) =
(summaryRun wb, map summaryLevel levels, fmap (fmap summaryRun) tree)
(writeBufferSize wb, map summaryLevel levels, fmap (fmap runSize) tree)
where
summaryLevel (mmr, rs) =
let (ongoing, complete) = summaryMR mmr
in (ongoing, complete <> map summaryRun rs)

summaryRun = runSize
in (ongoing, complete <> map runSize rs)

summaryMR = \case
Nothing -> ([], [])
Just (_, _, _, _, CompletedMerge r) -> ([], [summaryRun r])
Just (_, _, _, _, OngoingMerge _ rs _) -> (map summaryRun rs, [])
Just (_, _, _, _, CompletedMerge r) -> ([], [runSize r])
Just (_, _, _, _, OngoingMerge _ rs _) -> (map runSize rs, [])

-------------------------------------------------------------------------------
-- Tracing
--

-- TODO: these events are incomplete, in particular we should also trace what
-- happens in the union level.
-- happens in the union level. Somewhat related: issue #445.
data Event =
NewTableEvent TableId LSMConfig
| UpdateEvent TableId Key Entry
Expand Down Expand Up @@ -1974,8 +2002,26 @@ instance (QC.Arbitrary v, QC.Arbitrary b) => QC.Arbitrary (Update v b) where
, (1, pure Delete)
]

instance QC.Arbitrary Run where
arbitrary = Run <$> QC.arbitrary
shrink = map Run . QC.shrink . runEntries

instance QC.Arbitrary LevelMergeType where
arbitrary = QC.elements [MergeMidLevel, MergeLastLevel]

instance QC.Arbitrary TreeMergeType where
arbitrary = QC.elements [MergeLevel, MergeUnion]

instance QC.Arbitrary LSMConfig where
arbitrary = do
configMaxWriteBufferSize <- QC.chooseInt (1, 10)
configSizeRatio <- QC.chooseInt (2, 8)
pure LSMConfig {configMaxWriteBufferSize, configSizeRatio}
shrink (LSMConfig size ratio) =
[ LSMConfig size' ratio'
| (size', ratio') <- QC.liftShrink2 QC.shrink shrinkSizeRatio (size, ratio)
, size' >= 1
]
where
shrinkSizeRatio 4 = []
shrinkSizeRatio _ = [4] -- try shrinking to four, the default ratio
Loading
Loading