1- {-# LANGUAGE DataKinds #-}
21{-# LANGUAGE PatternSynonyms #-}
32{-# LANGUAGE UnboxedTuples #-}
43
@@ -106,7 +105,7 @@ import Data.Foldable (for_, toList, traverse_)
106105import Data.Functor.Contravariant
107106import Data.Map.Strict (Map )
108107import qualified Data.Map.Strict as Map
109- import Data.Maybe (catMaybes )
108+ import Data.Maybe (catMaybes , maybeToList )
110109import Data.Primitive.Types
111110import Data.STRef
112111
@@ -150,7 +149,7 @@ type Counter = Int
150149-- | The levels of the table, from most to least recently inserted.
151150data LSMContent s =
152151 LSMContent
153- Buffer -- ^ write buffer is level 0 of the table, in-memory
152+ WriteBuffer -- ^ write buffer is level 0 of the table, in-memory
154153 (Levels s ) -- ^ \"regular\" levels 1+, on disk in real implementation
155154 (UnionLevel s ) -- ^ a potential last level
156155
@@ -288,17 +287,33 @@ pattern PendingMerge :: TreeMergeType
288287 -> PendingMerge s
289288pattern PendingMerge mt prs ts <- (pendingContent -> (mt, prs, ts))
290289
291- type Run = Map Key Entry
292- type Buffer = Map Key Entry
293-
294- bufferToRun :: Buffer -> Run
295- bufferToRun = id
290+ newtype Run = Run { runEntries :: Map Key Entry }
291+ deriving newtype Show
296292
297293runSize :: Run -> Int
298- runSize = Map. size
294+ runSize = Map. size . runEntries
295+
296+ lookupRun :: Key -> Run -> Maybe Entry
297+ lookupRun k = Map. lookup k . runEntries
298+
299+ newtype WriteBuffer = WriteBuffer { bufferEntries :: Map Key Entry }
300+
301+ emptyWriteBuffer :: WriteBuffer
302+ emptyWriteBuffer = WriteBuffer Map. empty
303+
304+ writeBufferSize :: WriteBuffer -> Int
305+ writeBufferSize = Map. size . bufferEntries
306+
307+ insertWriteBuffer :: Key -> Entry -> WriteBuffer -> WriteBuffer
308+ insertWriteBuffer k e = WriteBuffer . Map. insertWith combine k e . bufferEntries
309+
310+ lookupWriteBuffer :: Key -> WriteBuffer -> Maybe Entry
311+ lookupWriteBuffer k = Map. lookup k . bufferEntries
299312
300- bufferSize :: Buffer -> Int
301- bufferSize = Map. size
313+ -- | Flush a write buffer. In the real implementation, this involves IO.
314+ -- Note that we should not never flush an empty write buffer.
315+ flushWriteBuffer :: WriteBuffer -> Run
316+ flushWriteBuffer (WriteBuffer m) = assert (not (null m)) (Run m)
302317
303318type Entry = Update Value Blob
304319
@@ -356,24 +371,35 @@ invariant conf@LSMConfig{..} (LSMContent _ levels ul) = do
356371
357372 levelsInvariant (ln+ 1 ) ls
358373
359- -- All runs within a level "proper" (as opposed to the incoming runs
374+ -- All regular runs within a level "proper" (as opposed to the incoming runs
360375 -- being merged) should be of the correct size for the level.
361376 expectedRunLengths :: Int -> [Run ] -> [Level s ] -> ST s ()
362377 expectedRunLengths ln rs ls =
363378 case mergePolicyForLevel ln ls ul of
364- -- Levels using levelling have only one (incoming) run, which almost
365- -- always consists of an ongoing merge. The exception is when a
366- -- levelling run becomes too large and is promoted, in that case
367- -- initially there's no merge, but it is still represented as an
368- -- 'IncomingRun', using 'Single'. Thus there are no other resident runs.
369- LevelLevelling -> assertST $ null rs && null ls
370- -- Runs in tiering levels usually fit that size, but they can be one
371- -- larger, if a run has been held back (creating a (T+1)-way merge).
372- LevelTiering -> assertST $ all (\ r -> runToLevelNumber LevelTiering conf r `elem` [ln, ln+ 1 ]) rs
373- -- (This is actually still not really true, but will hold in practice.
374- -- In the pathological case, all runs passed to the next level can be
375- -- factor ((T+1)/T) too large, and there the same holding back can lead to
376- -- factor ((T+2)/T) etc., until at level 12 a run is two levels too large.
379+ LevelLevelling -> do
380+ -- Levelling can only occur on the last level.
381+ assertST $ null ls
382+ -- Levels using levelling have only one (incoming) run, which almost
383+ -- always consists of an ongoing merge. The exception is when a
384+ -- levelling run becomes too large and is promoted, in that case
385+ -- initially there's no merge, but it is still represented as an
386+ -- 'IncomingRun', using 'Single'. Thus there are no other resident
387+ -- runs.
388+ assertST $ null rs
389+ LevelTiering -> do
390+ -- There are no empty runs in tiering levels, as they are either:
391+ -- 1. a mid-level, so deletes can't be dropped and merges can't result
392+ -- in empty runs
393+ -- 2. the first level, so their runs come directly from flushing the
394+ -- write buffer.
395+ assertST $ all (\ r -> runSize r > 0 ) rs
396+ -- Runs in tiering levels usually fit that level's size, but they can
397+ -- be slightly larger if a run has been held back (creating a
398+ -- (T+1)-way merge).
399+ --
400+ -- TODO: Holding back runs can theoretically result in runs that are
401+ -- more than one size too large. See issue #829.
402+ assertST $ all (\ r -> runToLevelNumber LevelTiering conf r `elem` [ln, ln+ 1 ]) rs
377403
378404 -- Incoming runs being merged also need to be of the right size, but the
379405 -- conditions are more complicated.
@@ -904,8 +930,10 @@ newMergingRun mergeType runs = do
904930
905931mergek :: IsMergeType t => t -> [Run ] -> Run
906932mergek t =
907- (if isLastLevel t then Map. filter (/= Delete ) else id )
933+ Run
934+ . (if isLastLevel t then Map. filter (/= Delete ) else id )
908935 . Map. unionsWith (if isUnion t then combineUnion else combine)
936+ . map runEntries
909937
910938-- | Combines two entries that have been performed after another. Therefore, the
911939-- newer one overwrites the old one (or modifies it for 'Mupsert'). Only take a
@@ -992,7 +1020,7 @@ newWith tr tid conf
9921020 | otherwise = do
9931021 traceWith tr $ NewTableEvent tid conf
9941022 c <- newSTRef 0
995- lsm <- newSTRef (LSMContent Map. empty [] NoUnion )
1023+ lsm <- newSTRef (LSMContent emptyWriteBuffer [] NoUnion )
9961024 pure (LSMHandle tid c conf lsm)
9971025
9981026inserts :: Tracer (ST s ) Event -> LSM s -> [(Key , Value , Maybe Blob )] -> ST s ()
@@ -1030,11 +1058,12 @@ update tr (LSMHandle tid scr conf lsmr) k entry = do
10301058 modifySTRef' scr (+ 1 )
10311059 supplyCreditsLevels (NominalCredit 1 ) ls
10321060 invariant conf content
1033- let wb' = Map. insertWith combine k entry wb
1034- if bufferSize wb' >= maxWriteBufferSize conf
1061+ let wb' = insertWriteBuffer k entry wb
1062+ if writeBufferSize wb' >= maxWriteBufferSize conf
10351063 then do
1036- ls' <- increment (LevelEvent tid >$< tr) sc conf (bufferToRun wb') ls unionLevel
1037- let content' = LSMContent Map. empty ls' unionLevel
1064+ let r = flushWriteBuffer wb'
1065+ ls' <- increment (LevelEvent tid >$< tr) sc conf r ls unionLevel
1066+ let content' = LSMContent emptyWriteBuffer ls' unionLevel
10381067 invariant conf content'
10391068 writeSTRef lsmr content'
10401069 else
@@ -1099,7 +1128,7 @@ unions tr childTid lsms = do
10991128 Just tree -> do
11001129 debt <- fst <$> remainingDebtMergingTree tree
11011130 Union tree <$> newSTRef debt
1102- lsmr <- newSTRef (LSMContent Map. empty [] unionLevel)
1131+ lsmr <- newSTRef (LSMContent emptyWriteBuffer [] unionLevel)
11031132 c <- newSTRef 0
11041133 pure (LSMHandle childTid c conf lsmr)
11051134
@@ -1212,17 +1241,17 @@ mergeAcc mt = foldl (updateAcc com) Nothing . catMaybes
12121241--
12131242-- In the real implementation, this is done not on an individual 'LookupAcc',
12141243-- but one for each key, i.e. @Vector (Maybe Entry)@.
1215- doLookup :: Buffer -> [Run ] -> UnionLevel s -> Key -> ST s (LookupResult Value Blob )
1244+ doLookup :: WriteBuffer -> [Run ] -> UnionLevel s -> Key -> ST s (LookupResult Value Blob )
12161245doLookup wb runs ul k = do
1217- let acc0 = lookupBatch (Map. lookup k wb) k runs
1246+ let acc0 = lookupBatch (lookupWriteBuffer k wb) k runs
12181247 case ul of
12191248 NoUnion ->
12201249 pure (convertAcc acc0)
12211250 Union tree _ -> do
12221251 treeBatches <- buildLookupTree tree
12231252 let treeResults = lookupBatch Nothing k <$> treeBatches
12241253 pure $ convertAcc $ foldLookupTree $
1225- if null wb && null runs
1254+ if writeBufferSize wb == 0 && null runs
12261255 then treeResults
12271256 else LookupNode MergeLevel [LookupBatch acc0, treeResults ]
12281257 where
@@ -1239,7 +1268,7 @@ doLookup wb runs ul k = do
12391268-- In a real implementation, this would take all keys at once and be in IO.
12401269lookupBatch :: LookupAcc -> Key -> [Run ] -> LookupAcc
12411270lookupBatch acc k rs =
1242- let entries = [entry | r <- rs, Just entry <- [Map. lookup k r]]
1271+ let entries = [entry | r <- rs, Just entry <- [lookupRun k r]]
12431272 in foldl (updateAcc combine) acc entries
12441273
12451274data LookupTree a = LookupBatch a
@@ -1609,7 +1638,7 @@ newPendingLevelMerge [Single r] Nothing =
16091638newPendingLevelMerge [Merging {}] Nothing =
16101639 -- This case should never occur. If there is a single entry in the list,
16111640 -- there can only be one level in the input table. At level 1 there are no
1612- -- merging runs, so it must be a PreExistingRun .
1641+ -- merging runs, it must be a Single run flushed from a write buffer .
16131642 error " newPendingLevelMerge: singleton Merging run"
16141643newPendingLevelMerge irs tree = do
16151644 let prs = map incomingToPreExistingRun irs
@@ -1629,12 +1658,12 @@ newPendingUnionMerge trees = do
16291658
16301659contentToMergingTree :: LSMContent s -> ST s (Maybe (MergingTree s ))
16311660contentToMergingTree (LSMContent wb ls ul) =
1632- newPendingLevelMerge (buffers ++ levels) trees
1661+ newPendingLevelMerge (maybeToList buffer ++ levels) trees
16331662 where
16341663 -- flush the write buffer (but this should not modify the content)
1635- buffers
1636- | bufferSize wb == 0 = []
1637- | otherwise = [ Single (bufferToRun wb)]
1664+ buffer
1665+ | writeBufferSize wb == 0 = Nothing
1666+ | otherwise = Just ( Single (flushWriteBuffer wb))
16381667
16391668 levels = flip concatMap ls $ \ (Level ir rs) -> ir : map Single rs
16401669
@@ -1799,7 +1828,7 @@ data MTree r = MLeaf r
17991828 | MNode TreeMergeType [MTree r ]
18001829 deriving stock (Eq , Foldable , Functor , Show )
18011830
1802- allLevels :: LSM s -> ST s (Buffer , [[Run ]], Maybe (MTree Run ))
1831+ allLevels :: LSM s -> ST s (WriteBuffer , [[Run ]], Maybe (MTree Run ))
18031832allLevels (LSMHandle _ _ _conf lsmr) = do
18041833 LSMContent wb ls ul <- readSTRef lsmr
18051834 rs <- flattenLevels ls
@@ -1851,8 +1880,9 @@ logicalValue lsm = do
18511880 (wb, levels, tree) <- allLevels lsm
18521881 let r = mergek
18531882 MergeLevel
1854- (wb : concat levels ++ toList (mergeTree <$> tree))
1855- pure (Map. mapMaybe justInsert r)
1883+ (Run (bufferEntries wb) -- we don't flush, but treat wb as a run
1884+ : concat levels ++ toList (mergeTree <$> tree))
1885+ pure (Map. mapMaybe justInsert (runEntries r))
18561886 where
18571887 mergeTree :: MTree Run -> Run
18581888 mergeTree (MLeaf r) = r
@@ -1862,7 +1892,7 @@ logicalValue lsm = do
18621892 justInsert Delete = Nothing
18631893 justInsert (Mupsert v) = Just (v, Nothing )
18641894
1865- type Representation = (Run , [LevelRepresentation ], Maybe (MTree Run ))
1895+ type Representation = (WriteBuffer , [LevelRepresentation ], Maybe (MTree Run ))
18661896
18671897type LevelRepresentation =
18681898 (Maybe (MergePolicyForLevel , NominalDebt , NominalCredit ,
@@ -1892,25 +1922,23 @@ dumpLevel (Level (Merging mp nd ncv (MergingRun mt _ ref)) rs) = do
18921922representationShape :: Representation
18931923 -> (Int , [([Int ], [Int ])], Maybe (MTree Int ))
18941924representationShape (wb, levels, tree) =
1895- (summaryRun wb, map summaryLevel levels, fmap (fmap summaryRun ) tree)
1925+ (writeBufferSize wb, map summaryLevel levels, fmap (fmap runSize ) tree)
18961926 where
18971927 summaryLevel (mmr, rs) =
18981928 let (ongoing, complete) = summaryMR mmr
1899- in (ongoing, complete <> map summaryRun rs)
1900-
1901- summaryRun = runSize
1929+ in (ongoing, complete <> map runSize rs)
19021930
19031931 summaryMR = \ case
19041932 Nothing -> ([] , [] )
1905- Just (_, _, _, _, CompletedMerge r) -> ([] , [summaryRun r])
1906- Just (_, _, _, _, OngoingMerge _ rs _) -> (map summaryRun rs, [] )
1933+ Just (_, _, _, _, CompletedMerge r) -> ([] , [runSize r])
1934+ Just (_, _, _, _, OngoingMerge _ rs _) -> (map runSize rs, [] )
19071935
19081936-------------------------------------------------------------------------------
19091937-- Tracing
19101938--
19111939
19121940-- TODO: these events are incomplete, in particular we should also trace what
1913- -- happens in the union level.
1941+ -- happens in the union level. Somewhat related: issue #445.
19141942data Event =
19151943 NewTableEvent TableId LSMConfig
19161944 | UpdateEvent TableId Key Entry
@@ -1974,8 +2002,26 @@ instance (QC.Arbitrary v, QC.Arbitrary b) => QC.Arbitrary (Update v b) where
19742002 , (1 , pure Delete )
19752003 ]
19762004
2005+ instance QC. Arbitrary Run where
2006+ arbitrary = Run <$> QC. arbitrary
2007+ shrink = map Run . QC. shrink . runEntries
2008+
19772009instance QC. Arbitrary LevelMergeType where
19782010 arbitrary = QC. elements [MergeMidLevel , MergeLastLevel ]
19792011
19802012instance QC. Arbitrary TreeMergeType where
19812013 arbitrary = QC. elements [MergeLevel , MergeUnion ]
2014+
2015+ instance QC. Arbitrary LSMConfig where
2016+ arbitrary = do
2017+ configMaxWriteBufferSize <- QC. chooseInt (1 , 10 )
2018+ configSizeRatio <- QC. chooseInt (2 , 8 )
2019+ pure LSMConfig {configMaxWriteBufferSize, configSizeRatio}
2020+ shrink (LSMConfig size ratio) =
2021+ [ LSMConfig size' ratio'
2022+ | (size', ratio') <- QC. liftShrink2 QC. shrink shrinkSizeRatio (size, ratio)
2023+ , size' >= 1
2024+ ]
2025+ where
2026+ shrinkSizeRatio 4 = []
2027+ shrinkSizeRatio _ = [4 ] -- try shrinking to four, the default ratio
0 commit comments