diff --git a/Data/Pool.hs b/Data/Pool.hs index 6764e8b..ea409d2 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -32,6 +32,7 @@ module Data.Pool Pool(idleTime, maxResources, numStripes) , LocalPool , createPool + , createPoolWithMaxReuseCount , withResource , takeResource , tryWithResource @@ -45,7 +46,7 @@ import Control.Applicative ((<$>)) import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay) import Control.Concurrent.STM import Control.Exception (SomeException, onException, mask_) -import Control.Monad (forM_, forever, join, liftM3, unless, when) +import Control.Monad (forM_, forever, join, liftM5, unless, when) import Data.Hashable (hash) import Data.IORef (IORef, newIORef, mkWeakIORef) import Data.List (partition) @@ -86,6 +87,10 @@ data LocalPool a = LocalPool { -- ^ Count of open entries (both idle and in use). , entries :: TVar [Entry a] -- ^ Idle entries. + , increment :: a -> a + -- ^ Function that increments reuse count. + , isReusable :: a -> Bool + -- ^ Function that returns if a resource is reusable or not. , lfin :: IORef () -- ^ empty value used to attach a finalizer to (internal) } deriving (Typeable) @@ -150,8 +155,70 @@ createPool -- Requests for resources will block if this limit is reached on a -- single stripe, even if other stripes have idle resources -- available. - -> IO (Pool a) -createPool create destroy numStripes idleTime maxResources = do + -> IO (Pool a) +createPool create destroy numStripes idleTime maxResources = + createPool' create destroy numStripes idleTime maxResources id (const True) + +createPoolWithMaxReuseCount + :: IO a + -- ^ Action that creates a new resource. + -> (a -> IO ()) + -- ^ Action that destroys an existing resource. + -> Int + -- ^ Stripe count. The number of distinct sub-pools to maintain. + -- The smallest acceptable value is 1. + -> NominalDiffTime + -- ^ Amount of time for which an unused resource is kept open. + -- The smallest acceptable value is 0.5 seconds. + -- + -- The elapsed time before destroying a resource may be a little + -- longer than requested, as the reaper thread wakes at 1-second + -- intervals. + -> Int + -- ^ Maximum number of resources to keep open per stripe. The + -- smallest acceptable value is 1. + -- + -- Requests for resources will block if this limit is reached on a + -- single stripe, even if other stripes have idle resources + -- available. + -> Int + -- ^ Maxium reuse count. The smallest acceptable value is 1. + -> IO (Pool (a, Int)) +createPoolWithMaxReuseCount create destroy numStripes idleTime maxResources maxReuseCount = do + let create' = do x <- create; return (x, 0) + destroy' (x, _) = destroy x + increment (x, n) = (x, n + 1) + isReusable (_, n) = n < maxReuseCount + createPool' create' destroy' numStripes idleTime maxResources increment isReusable + +createPool' + :: IO a + -- ^ Action that creates a new resource. + -> (a -> IO ()) + -- ^ Action that destroys an existing resource. + -> Int + -- ^ Stripe count. The number of distinct sub-pools to maintain. + -- The smallest acceptable value is 1. + -> NominalDiffTime + -- ^ Amount of time for which an unused resource is kept open. + -- The smallest acceptable value is 0.5 seconds. + -- + -- The elapsed time before destroying a resource may be a little + -- longer than requested, as the reaper thread wakes at 1-second + -- intervals. + -> Int + -- ^ Maximum number of resources to keep open per stripe. The + -- smallest acceptable value is 1. + -- + -- Requests for resources will block if this limit is reached on a + -- single stripe, even if other stripes have idle resources + -- available. + -> (a -> a) + -- ^ Function that increments reuse count. + -> (a -> Bool) + -- ^ Function that returns if a resource is reusable or not. + -> IO (Pool a) +createPool' create destroy numStripes idleTime maxResources increment isReusable = do when (numStripes < 1) $ modError "pool " $ "invalid stripe count " ++ show numStripes when (idleTime < 0.5) $ @@ -159,7 +226,7 @@ createPool create destroy numStripes idleTime maxResources = do when (maxResources < 1) $ modError "pool " $ "invalid maximum resource count " ++ show maxResources localPools <- V.replicateM numStripes $ - liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ()) + liftM5 LocalPool (newTVarIO 0) (newTVarIO []) (return increment) (return isReusable) (newIORef ()) reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask -> unmask $ reaper destroy idleTime localPools fin <- newIORef () @@ -206,8 +273,8 @@ reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO () reaper destroy idleTime pools = forever $ do threadDelay (1 * 1000000) now <- getCurrentTime - let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime V.forM_ pools $ \LocalPool{..} -> do + let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime || not (isReusable entry) resources <- atomically $ do (stale,fresh) <- partition isStale <$> readTVar entries unless (null stale) $ do @@ -276,9 +343,10 @@ takeResource :: Pool a -> IO (a, LocalPool a) takeResource pool@Pool{..} = do local@LocalPool{..} <- getLocalPool pool resource <- liftBase . join . atomically $ do - ents <- readTVar entries + ents' <- readTVar entries + let (olds, ents) = span (not . isReusable . entry) ents' case ents of - (Entry{..}:es) -> writeTVar entries es >> return (return entry) + (Entry{..}:es) -> writeTVar entries (olds ++ es) >> return (return entry) [] -> do used <- readTVar inUse when (used == maxResources) retry @@ -322,9 +390,10 @@ tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a)) tryTakeResource pool@Pool{..} = do local@LocalPool{..} <- getLocalPool pool resource <- liftBase . join . atomically $ do - ents <- readTVar entries + ents' <- readTVar entries + let (olds, ents) = span (not . isReusable . entry) ents' case ents of - (Entry{..}:es) -> writeTVar entries es >> return (return . Just $ entry) + (Entry{..}:es) -> writeTVar entries (olds ++ es) >> return (return . Just $ entry) [] -> do used <- readTVar inUse if used == maxResources @@ -363,7 +432,7 @@ destroyResource Pool{..} LocalPool{..} resource = do putResource :: LocalPool a -> a -> IO () putResource LocalPool{..} resource = do now <- getCurrentTime - atomically $ modifyTVar_ entries (Entry resource now:) + atomically $ modifyTVar_ entries (Entry (increment resource) now:) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE putResource #-} #endif